ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spring Cloud Eureka] EurekaClient - cacheRefresh, heartbeat 파헤쳐보기(2)
    개발/Spring Cloud 2020. 12. 21. 15:33

    이전 글

    [Spring Cloud Eureka] EurekaClient - cacheRefresh, heartbeat 파헤쳐보기(1)


    EurekaClientAutoConfiguration안에 RefreshableEurekaClientConfiguration에 선언되어있는 eurekaClient메소드를 Bean으로 생성되는 과정을 통해 우리는 EurekaServer에 등록되어있는 Applications를 구했었다. 그리고 5분마다 EurekaServerList(ZoneEndpoints)를 계속해서 update(현재는 application.yml을 기준으로)하고 있는 AsyncResolver에 대해서도 알아보았었다.

    그럼 이어서 EurekaClient가 어떻게 EurekaServer와의 통신으로 EurekaClientList를 가져오는지, EurekaServer로 heartbeat는 어떤식으로 진행되는지 알아보자.

    // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
    initScheduledTasks();

    CacheRefreshTask에 대해

    /**
         * Initializes all scheduled tasks.
         */
        private void initScheduledTasks() {
            if (clientConfig.shouldFetchRegistry()) {
                // registry cache refresh timer
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                cacheRefreshTask = new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                );
                scheduler.schedule(
                        cacheRefreshTask,
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }

    Eureka Client에 직접 Eureka ServerList를 가지고 있을지 여부 (default : true)

    scheduler에 cacheRefreshTask 등록

    - eureka.client.registry-fetch-interval-seconds (default : 30s)로 시간설정, timeout값이자 schedule delay 값

    - cache-refresh-executor-exponential-back-off-bound (default : 10) 이 값과 * registry-fetch-interval-seconds 한 값이 max timeout 값

    @Override
        public void run() {
            Future<?> future = null;
            try {
                future = executor.submit(task);
                threadPoolLevelGauge.set((long) executor.getActiveCount());
                future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
                delay.set(timeoutMillis);
                threadPoolLevelGauge.set((long) executor.getActiveCount());
                successCounter.increment();

    TimedSupervisorTask의 run() 메소드이다. executor.submit으로 task를 실행 후 future를 반환받고 future.get으로 위에서 설정한 timeout값으로 끝날때 까지 block로 기다린다.

    } finally {
                if (future != null) {
                    future.cancel(true);
                }
    
                if (!scheduler.isShutdown()) {
                    scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
                }
            }

    그리고 finally로 다시 delay만큼 후에 scheduler는 실행된다.

    자 그럼 task를 살펴보자

    task는 CacheRefreshThread() 이다.

    /**
         * The task that fetches the registry information at specified intervals.
         *
         */
        class CacheRefreshThread implements Runnable {
            public void run() {
                refreshRegistry();
            }
        }

     

    refreshRegistry

    @VisibleForTesting
        void refreshRegistry() {
            try {
            	.
                .
                .
                boolean success = fetchRegistry(remoteRegionsModified);
                if (success) {
                    registrySize = localRegionApps.get().size();
                    lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
                }

    이전에 봤던 fetchRegistry가 또 등장한다. 

    private boolean fetchRegistry(boolean forceFullRegistryFetch) {
            Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    
            try {
                // If the delta is disabled or if it is the first time, get all
                // applications
                Applications applications = getApplications();
    
                if (clientConfig.shouldDisableDelta()
                        || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                        || forceFullRegistryFetch
                        || (applications == null)
                        || (applications.getRegisteredApplications().size() == 0)
                        || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
                {
                    .
                    .
                    .
                    getAndStoreFullRegistry();
                } else {
                    getAndUpdateDelta(applications);
                }

    그러나 이전에 applications를 구해놓았다면, getAndUpdateDelta(applications) 메소드를 실행할 것이다.

    /**
         * Get the delta registry information from the eureka server and update it locally.
         * When applying the delta, the following flow is observed:
         *
         * if (update generation have not advanced (due to another thread))
         *   atomically try to: update application with the delta and get reconcileHashCode
         *   abort entire processing otherwise
         *   do reconciliation if reconcileHashCode clash
         * fi
         *
         * @return the client response
         * @throws Throwable on error
         */
        private void getAndUpdateDelta(Applications applications) throws Throwable {
            long currentUpdateGeneration = fetchRegistryGeneration.get();
    
            Applications delta = null;
            EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
            if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
                delta = httpResponse.getEntity();
            }
    
            if (delta == null) {
                logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                        + "Hence got the full registry.");
                getAndStoreFullRegistry();
            } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
                logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
                String reconcileHashCode = "";
                if (fetchRegistryUpdateLock.tryLock()) {
                    try {
                        updateDelta(delta);
                        reconcileHashCode = getReconcileHashCode(applications);
                    } finally {
                        fetchRegistryUpdateLock.unlock();
                    }
                } else {
                    logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
                }
                // There is a diff in number of instances for some reason
                if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                    reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
                }
            } else {
                logger.warn("Not updating application delta as another thread is updating it already");
                logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
            }
        }

    EurekaServer로 /eureka/apps/delta API를 쏘고 응답을 받는다.

    여기서 delta란 살펴보니 EurekaServer에 새로 등록된  application의 정보를 뜻하는 것 같다.

    localhost:8761

    현재 등록된것이 없다.

    이 때의 delta 값 (새로 등록된 application)은 아직 없고 versions만 표시되고 있다.

    위와같이 추가된 뒤에는 delta값이 제대로 표시되고 있다.

    <versions__delta>2</versions__delta>
    <apps__hashcode>UP_1_</apps__hashcode>

    delta값이 applications들의 상태가 update 될 때마다 version은 증가되는 것 같고 apps__hashcode 값은 몇대의 applications가 UP상태인지 알려주고 있다. UP_1_ (현재 1대 UP)

     

    이 상태에서 USER-SVC를 종료하면

    version은 +3이나 올랐고 (UP상태 -> 종료상태 까지 3단계를 거치는 것 같다.) apps__hashcode 값은 UP된 application이 없으니 표시되지 않고 actionType값은 DELETED가 되었다.

    이 값들로 각 EurekaClient는 localCache를 refresh(update) 하는 듯 하다.

     

    그래서 updateDelta 메소드를 살펴보면

    /**
         * Updates the delta information fetches from the eureka server into the
         * local cache.
         *
         * @param delta
         *            the delta information received from eureka server in the last
         *            poll cycle.
         */
        private void updateDelta(Applications delta) {
            int deltaCount = 0;
            for (Application app : delta.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    Applications applications = getApplications();
                    String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
                    if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
                        Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
                        if (null == remoteApps) {
                            remoteApps = new Applications();
                            remoteRegionVsApps.put(instanceRegion, remoteApps);
                        }
                        applications = remoteApps;
                    }
    
                    ++deltaCount;
                    if (ActionType.ADDED.equals(instance.getActionType())) {
                        Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                        if (existingApp == null) {
                            applications.addApplication(app);
                        }
                        logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
                        applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
                    } else if (ActionType.MODIFIED.equals(instance.getActionType())) {
                        Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                        if (existingApp == null) {
                            applications.addApplication(app);
                        }
                        logger.debug("Modified instance {} to the existing apps ", instance.getId());
    
                        applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
    
                    } else if (ActionType.DELETED.equals(instance.getActionType())) {
                        Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                        if (existingApp != null) {
                            logger.debug("Deleted instance {} to the existing apps ", instance.getId());
                            existingApp.removeInstance(instance);
                            /*
                             * We find all instance list from application(The status of instance status is not only the status is UP but also other status)
                             * if instance list is empty, we remove the application.
                             */
                            if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {
                                applications.removeApplication(existingApp);
                            }
                        }
                    }
                }
            }
            logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);
    
            getApplications().setVersion(delta.getVersion());
            getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
    
            for (Applications applications : remoteRegionVsApps.values()) {
                applications.setVersion(delta.getVersion());
                applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
            }
        }
    public Applications getApplications() {
            return localRegionApps.get();
    }

    delta API를 통해 가져온 데이터로 ActionType에 따라서 applications(localRegionApps)를 추가, 변경, 삭제를 진행하고 있다. version또한 일치시킨다.

     

    heartbeatTask에 대해

    이번엔 heartbeat이다. 네이밍만 봐도 EurekaServer로 나의 정보를 보내는 역할 같다.

    if (clientConfig.shouldRegisterWithEureka()) {
                int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
                int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
                logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
    
                // Heartbeat timer
                heartbeatTask = new TimedSupervisorTask(
                        "heartbeat",
                        scheduler,
                        heartbeatExecutor,
                        renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()
                );
                scheduler.schedule(
                        heartbeatTask,
                        renewalIntervalInSecs, TimeUnit.SECONDS);

    eureka.instance.lease-renewal-interval-in-seconds (default : 30s) heartbeat interval 시간

    eureka.client. heartbeat-executor-exponential-back-off-bound (default : 10) maxDelay 값을 구하기위한 지수값

     

    /**
         * The heartbeat task that renews the lease in the given intervals.
         */
        private class HeartbeatThread implements Runnable {
    
            public void run() {
                if (renew()) {
                    lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
                }
            }
        }
    boolean renew() {
            EurekaHttpResponse<InstanceInfo> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
                if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                    REREGISTER_COUNTER.increment();
                    logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                    long timestamp = instanceInfo.setIsDirtyWithTime();
                    boolean success = register();
                    if (success) {
                        instanceInfo.unsetIsDirty(timestamp);
                    }
                    return success;
                }
                return httpResponse.getStatusCode() == Status.OK.getStatusCode();
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
                return false;
            }
        }

    eurekaTransport.registrationaClient의 sendHeartBeat 메소드를 호출 한다.

    AbstractJerseyEurekaHttpClient

    public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
            String urlPath = "apps/" + appName + '/' + id;
            ClientResponse response = null;
            try {
                WebResource webResource = jerseyClient.resource(serviceUrl)
                        .path(urlPath)
                        .queryParam("status", info.getStatus().toString())
                        .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
                if (overriddenStatus != null) {
                    webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
                }
                Builder requestBuilder = webResource.getRequestBuilder();
                addExtraHeaders(requestBuilder);
                response = requestBuilder.put(ClientResponse.class);
                EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
                if (response.hasEntity() &&
                        !HTML.equals(response.getType().getSubtype())) { //don't try and deserialize random html errors from the server
                    eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
                }
                return eurekaResponseBuilder.build();
            } finally {
                if (logger.isDebugEnabled()) {
                    logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
                }
                if (response != null) {
                    response.close();
                }
            }
        }

    최종적으로 AbstractJerseyEurekaHttpClient의 sendHeartBeat를 호출하게 된다.

    EurekaServer로 apps/{appName}/{id} api를 날리게된다. (ex : apps/DEBUG-SVC/localhost:debug-svc:8080)

    eureka.instance.appname, eureka.instance.instance-id 로 각각 설정이 가능하다.

    설정하지 않으면 default로 아래와같이 설정이 된다.

    @Override
    	public void setEnvironment(Environment environment) {
    		this.environment = environment;
    		// set some defaults from the environment, but allow the defaults to use relaxed
    		// binding
    		String springAppName = this.environment.getProperty("spring.application.name",
    				"");
    		if (StringUtils.hasText(springAppName)) {
    			setAppname(springAppName);
    			setVirtualHostName(springAppName);
    			setSecureVirtualHostName(springAppName);
    		}
    	}

    appnamespring.application.name 을 읽는다.

    public static String getDefaultInstanceId(PropertyResolver resolver,
    			boolean includeHostname) {
    		String vcapInstanceId = resolver.getProperty("vcap.application.instance_id");
    		if (StringUtils.hasText(vcapInstanceId)) {
    			return vcapInstanceId;
    		}
    
    		String hostname = null;
    		if (includeHostname) {
    			hostname = resolver.getProperty("spring.cloud.client.hostname");
    		}
    		String appName = resolver.getProperty("spring.application.name");
    
    		String namePart = combineParts(hostname, SEPARATOR, appName);
    
    		String indexPart = resolver.getProperty("spring.application.instance_id",
    				resolver.getProperty("server.port"));
    
    		return combineParts(namePart, SEPARATOR, indexPart);
    	}
    private static final String SEPARATOR = ":";

    vcap.application.instance_id 가 존재하면 바로 적용

    hostname은 localhost 위의 ex(localhost:debug-svc:8080) 경우 hostname:appName:indexPart인데 spring.aplication.instance_id를 지정하지 않아 default값으로 server.port 값이 들어가서 최종적으로 localhost:debug-svc:8080값이 생성되었다.

     

                WebResource webResource = jerseyClient.resource(serviceUrl)
                        .path(urlPath)
                        .queryParam("status", info.getStatus().toString())
                        .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
                if (overriddenStatus != null) {
                    webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
                }
                Builder requestBuilder = webResource.getRequestBuilder();
                addExtraHeaders(requestBuilder);
                response = requestBuilder.put(ClientResponse.class);

    이렇게 생성된 urlPath(apps/DEBUG-SVC/localhost:debug-svc:8080)에 status(UP), lastDirtyTimestamp(변경된 시간?)을 같이 보내서 EurekaServer에 PUT 메소드로 API를 쏴서 본인의 상태를 알린다.

     

    앞에서 알아본 AsyncResolver, CacheRefresh, HeartBeat를 초기화가 끝나면 드디어 

    @Bean(destroyMethod = "shutdown")
    		@ConditionalOnMissingBean(value = EurekaClient.class,
    				search = SearchStrategy.CURRENT)
    		@org.springframework.cloud.context.config.annotation.RefreshScope
    		@Lazy
    		public EurekaClient eurekaClient(ApplicationInfoManager manager,
    				EurekaClientConfig config, EurekaInstanceConfig instance,
    				@Autowired(required = false) HealthCheckHandler healthCheckHandler) {
    			// If we use the proxy of the ApplicationInfoManager we could run into a
    			// problem
    			// when shutdown is called on the CloudEurekaClient where the
    			// ApplicationInfoManager bean is
    			// requested but wont be allowed because we are shutting down. To avoid this
    			// we use the
    			// object directly.
    			ApplicationInfoManager appManager;
    			if (AopUtils.isAopProxy(manager)) {
    				appManager = ProxyUtils.getTargetObject(manager);
    			}
    			else {
    				appManager = manager;
    			}
    			CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
    					config, this.optionalArgs, this.context);
    			cloudEurekaClient.registerHealthCheck(healthCheckHandler);
    			return cloudEurekaClient;
    		}

    cloudEurekaClient가 생성이 된것이다.

    그리고 끝나기 전에 registerHealthCheck를 실행해서 위의 Scheduler들이 동작하기 전에 EurekaServer로 본인의 instance를 등록한다.

    registerHealthCheck

    @Override
        public void registerHealthCheck(HealthCheckHandler healthCheckHandler) {
            if (instanceInfo == null) {
                logger.error("Cannot register a healthcheck handler when instance info is null!");
            }
            if (healthCheckHandler != null) {
                this.healthCheckHandlerRef.set(healthCheckHandler);
                // schedule an onDemand update of the instanceInfo when a new healthcheck handler is registered
                if (instanceInfoReplicator != null) {
                    instanceInfoReplicator.onDemandUpdate();
                }
            }
        }

    onDemandUpdate

    public boolean onDemandUpdate() {
            if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
                if (!scheduler.isShutdown()) {
                    scheduler.submit(new Runnable() {
                        @Override
                        public void run() {
                            logger.debug("Executing on-demand update of local InstanceInfo");
        
                            Future latestPeriodic = scheduledPeriodicRef.get();
                            if (latestPeriodic != null && !latestPeriodic.isDone()) {
                                logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                                latestPeriodic.cancel(false);
                            }
        
                            InstanceInfoReplicator.this.run();
                        }
                    });
                    return true;
                } else {
                    logger.warn("Ignoring onDemand update due to stopped scheduler");
                    return false;
                }
            } else {
                logger.warn("Ignoring onDemand update due to rate limiter");
                return false;
            }
        }

    InstanceInfoReplicator.this.run()

    public void run() {
            try {
                discoveryClient.refreshInstanceInfo();
    
                Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
                if (dirtyTimestamp != null) {
                    discoveryClient.register();
                    instanceInfo.unsetIsDirty(dirtyTimestamp);
                }
            } catch (Throwable t) {
                logger.warn("There was a problem with the instance info replicator", t);
            } finally {
                Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }

    discoveryClient.register()

    /**
         * Register with the eureka service by making the appropriate REST call.
         */
        boolean register() throws Throwable {
            logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
            EurekaHttpResponse<Void> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
            } catch (Exception e) {
                logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
                throw e;
            }
            if (logger.isInfoEnabled()) {
                logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
            }
            return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
        }

    이제는 익숙한 eurekaTransport.registrationClient 

    AbstractJerseyEurekaHttpClient.register

    @Override
        public EurekaHttpResponse<Void> register(InstanceInfo info) {
            String urlPath = "apps/" + info.getAppName();
            ClientResponse response = null;
            try {
                Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
                addExtraHeaders(resourceBuilder);
                response = resourceBuilder
                        .header("Accept-Encoding", "gzip")
                        .type(MediaType.APPLICATION_JSON_TYPE)
                        .accept(MediaType.APPLICATION_JSON)
                        .post(ClientResponse.class, info);
                return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
            } finally {
                if (logger.isDebugEnabled()) {
                    logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                            response == null ? "N/A" : response.getStatus());
                }
                if (response != null) {
                    response.close();
                }
            }
        }

    POST 메소드로 EurekaServer에 본인을 등록


    이렇게 단단하게 CacheRefresh, Heartbeat에 대해 알아보았다.

    EurekaClient가 어떻게 EurekaServer에 자신을 등록하고, 다른 Client List를 가져오는지 정말 상세하게 파고들진 않았지만 대략적으로 흐름은 알 수 있었던 기회였다.

    댓글

Designed by Tistory.