ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spring Cloud Eureka] EurekaClient - cacheRefresh, heartbeat 파헤쳐보기(1)
    개발/Spring Cloud 2020. 12. 16. 17:03

    Eureka Client가 Eureka Server를 통해서 Eureka Server에 등록되어있는 ServerList에 대한 heartbeat를 확인하고, 존재하는 ServerList에 대해 Eureka Client 본인이 직접 List를 cache해서 가지고 있는 과정을 한번 대충 따라가 보았다...

     

    spring-cloud-netflix-eureka-client-2.2.5.RELEASE.jar

    -- org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration

    	@Configuration(proxyBeanMethods = false)
    	@ConditionalOnRefreshScope
    	protected static class RefreshableEurekaClientConfiguration {
    
    		@Autowired
    		private ApplicationContext context;
    
    		@Autowired
    		private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;
    
    		@Bean(destroyMethod = "shutdown")
    		@ConditionalOnMissingBean(value = EurekaClient.class,
    				search = SearchStrategy.CURRENT)
    		@org.springframework.cloud.context.config.annotation.RefreshScope
    		@Lazy
    		public EurekaClient eurekaClient(ApplicationInfoManager manager,
    				EurekaClientConfig config, EurekaInstanceConfig instance,
    				@Autowired(required = false) HealthCheckHandler healthCheckHandler) {
    			// If we use the proxy of the ApplicationInfoManager we could run into a
    			// problem
    			// when shutdown is called on the CloudEurekaClient where the
    			// ApplicationInfoManager bean is
    			// requested but wont be allowed because we are shutting down. To avoid this
    			// we use the
    			// object directly.
    			ApplicationInfoManager appManager;
    			if (AopUtils.isAopProxy(manager)) {
    				appManager = ProxyUtils.getTargetObject(manager);
    			}
    			else {
    				appManager = manager;
    			}
    			CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
    					config, this.optionalArgs, this.context);
    			cloudEurekaClient.registerHealthCheck(healthCheckHandler);
    			return cloudEurekaClient;
    		}
    
    		@Bean
    		@ConditionalOnMissingBean(value = ApplicationInfoManager.class,
    				search = SearchStrategy.CURRENT)
    		@org.springframework.cloud.context.config.annotation.RefreshScope
    		@Lazy
    		public ApplicationInfoManager eurekaApplicationInfoManager(
    				EurekaInstanceConfig config) {
    			InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
    			return new ApplicationInfoManager(config, instanceInfo);
    		}
    
    		@Bean
    		@org.springframework.cloud.context.config.annotation.RefreshScope
    		@ConditionalOnBean(AutoServiceRegistrationProperties.class)
    		@ConditionalOnProperty(
    				value = "spring.cloud.service-registry.auto-registration.enabled",
    				matchIfMissing = true)
    		public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
    				CloudEurekaInstanceConfig instanceConfig,
    				ApplicationInfoManager applicationInfoManager, @Autowired(
    						required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
    			return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager)
    					.with(eurekaClient).with(healthCheckHandler).build();
    		}
    
    	}

    내부에 static class로 선언되어있는 RefreshableEurekaClientConfiguration에 의해 eurekaClient가 @Bean으로 생성된다.

    CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
    					config, this.optionalArgs, this.context);

    이 때 CloudEurekaClient 생성자를 실행한다.

    org.springframework.cloud.netflix.eureka.CloudEurekaClient

    public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
    			EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
    			ApplicationEventPublisher publisher) {
    		super(applicationInfoManager, config, args);
    		this.applicationInfoManager = applicationInfoManager;
    		this.publisher = publisher;
    		this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
    				"eurekaTransport");
    		ReflectionUtils.makeAccessible(this.eurekaTransportField);
    	}

    super로 부모의 생성자를 호출 하고 있다.

    public class CloudEurekaClient extends DiscoveryClient {

    부모인 DiscoveryClient의 생성자를 보자.

    com.netflix.discovery.DiscoveryClient

        @Inject
        DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                        Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
            if (args != null) {
                this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
                this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
                this.eventListeners.addAll(args.getEventListeners());
                this.preRegistrationHandler = args.preRegistrationHandler;
            } else {
                this.healthCheckCallbackProvider = null;
                this.healthCheckHandlerProvider = null;
                this.preRegistrationHandler = null;
            }
            .
            .
            .

    최종적으로 위의 생성자를 생성하는데 아래로좀 내려가보면

     

    바로 아래에 ThreadPool, ThreadPoolExecutor를 생성한다.

    try {
                // default size of 2 - 1 each for heartbeat and cacheRefresh
                scheduler = Executors.newScheduledThreadPool(2,
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-%d")
                                .setDaemon(true)
                                .build());
    
                heartbeatExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                                .setDaemon(true)
                                .build()
                );  // use direct handoff
    
                cacheRefreshExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                                .setDaemon(true)
                                .build()
                );  // use direct handoff
    
                eurekaTransport = new EurekaTransport();
                scheduleServerEndpointTask(eurekaTransport, args);

    scheduleServerEndpointTask 메소드로 인해 EurekaClient가 EurekaServer에 대한 정보와 스케쥴러를 등록하는 과정이 진행된다. 이는 아주 중요한데 간단하게 말하자면 EurekaClient가 바라보는 EurekaServer에 대한 Endpoint정보를 주기적으로 갱신하는 Resolver(AsyncResolver)를 만들고 EurekaHttpClient라는 EurekaServer로 Request를 보낼 객체를 생성하는 등 작업이 이루어진다. 자세한 사항은 내부를 파고들어가서 확인해보길 바란다.

     

    scheduler, heartbeatExecutor, cacheRefreshExecutor 생성

            if (clientConfig.shouldFetchRegistry()) {
                try {
                    boolean primaryFetchRegistryResult = fetchRegistry(false);
                    if (!primaryFetchRegistryResult) {
                        logger.info("Initial registry fetch from primary servers failed");
                    }
                    boolean backupFetchRegistryResult = true;
                    if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
                        backupFetchRegistryResult = false;
                        logger.info("Initial registry fetch from backup servers failed");
                    }
                    if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
                        throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
                    }
                } catch (Throwable th) {
                    logger.error("Fetch registry error at startup: {}", th.getMessage());
                    throw new IllegalStateException(th);
                }
            }

    초기에 한번 fetchRegistry(false)를 이용하여 Eureka Server를 통해서 Eureka ServerList들을 초기화한다.

    /**
         * Fetches the registry information.
         *
         * <p>
         * This method tries to get only deltas after the first fetch unless there
         * is an issue in reconciling eureka server and client registry information.
         * </p>
         *
         * @param forceFullRegistryFetch Forces a full registry fetch.
         *
         * @return true if the registry was fetched
         */
        private boolean fetchRegistry(boolean forceFullRegistryFetch) {
            Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    
            try {
                // If the delta is disabled or if it is the first time, get all
                // applications
                Applications applications = getApplications();

    처음 getApplications()를 하면 

    아직 Eureka Server로부터 정보를 가져오지 않았기 때문에 [] 아무것도 없다.

     

    if (clientConfig.shouldDisableDelta()
                        || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                        || forceFullRegistryFetch
                        || (applications == null)
                        || (applications.getRegisteredApplications().size() == 0)
                        || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
    {
    	logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
    	logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
    	logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
    	logger.info("Application is null : {}", (applications == null));
    	logger.info("Registered Applications size is zero : {}",
    		(applications.getRegisteredApplications().size() == 0));
    	logger.info("Application version is -1: {}", (applications.getVersion() == -1));
    	getAndStoreFullRegistry();
    } else {
    	getAndUpdateDelta(applications);
    }
    applications.setAppsHashCode(applications.getReconcileHashCode());
    logTotalInstances();

    처음 초기화할 때는 getAndStoreFullRegistry(); 메소드를타고 이후에 아래에서 살펴볼 cacheRefreshTask를 통해서는getAndUpdateDelta가 실행될 것이다.

    /**
         * Gets the full registry information from the eureka server and stores it locally.
         * When applying the full registry, the following flow is observed:
         *
         * if (update generation have not advanced (due to another thread))
         *   atomically set the registry to the new registry
         * fi
         *
         * @return the full registry information.
         * @throws Throwable
         *             on error.
         */
        private void getAndStoreFullRegistry() throws Throwable {
            long currentUpdateGeneration = fetchRegistryGeneration.get();
    
            logger.info("Getting all instance registry info from the eureka server");
    
            Applications apps = null;
            EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                    ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                    : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
            

    eureka.client.registry-refresh-single-vip-address를 지정하지 않았으면 null이기 때문에 

    eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) 가 실행될 것이다.

    여기서 eurekaTransport.queryClientSessionedEurekaHttpClient이다.

        @Override
        public EurekaHttpResponse<Applications> getApplications(final String... regions) {
            return execute(new RequestExecutor<Applications>() {
                @Override
                public EurekaHttpResponse<Applications> execute(EurekaHttpClient delegate) {
                    return delegate.getApplications(regions);
                }
    
                @Override
                public RequestType getRequestType() {
                    return RequestType.GetApplications;
                }
            });
        }

    execute가 반복되서 실행되는데 Thread를 타고들어가 최종적으로 5단계이다.

    SessionedEurekaHttpClient -> RetryableEurekaHttpClient -> RedirectingEurekaHttpClient -> MetricsCollectingEurekaHttpClient -> JerseyApplicationClient 순으로 실행이 된다.

    위의 HttpClient들은 전부 EurekaHttpClientDecorator를 상속받고있고 EurekaHttpClientDecorator는 EurekaHttpClient 인터페이스를 구현하고 있다.

    EurekaHttpClientDecorator는 모든 메소드에 대해 execute 메소드를 통해 내부 interface인 RequestExecutor의 구현을 통해 반환하고 있다.

    EurekaHttpClientDecorator

    public abstract class EurekaHttpClientDecorator implements EurekaHttpClient {
    
        public enum RequestType {
            Register,
            Cancel,
            SendHeartBeat,
            StatusUpdate,
            DeleteStatusOverride,
            GetApplications,
            GetDelta,
            GetVip,
            GetSecureVip,
            GetApplication,
            GetInstance,
            GetApplicationInstance
        }
    
        public interface RequestExecutor<R> {
            EurekaHttpResponse<R> execute(EurekaHttpClient delegate);
    
            RequestType getRequestType();
        }
    
        protected abstract <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor);
        
        @Override
        public EurekaHttpResponse<Void> register(final InstanceInfo info) {
            return execute(new RequestExecutor<Void>() {
                @Override
                public EurekaHttpResponse<Void> execute(EurekaHttpClient delegate) {
                    return delegate.register(info);
                }
    
                @Override
                public RequestType getRequestType() {
                    return RequestType.Register;
                }
            });
        }
    
        @Override
        public EurekaHttpResponse<Void> cancel(final String appName, final String id) {
            return execute(new RequestExecutor<Void>() {
                @Override
                public EurekaHttpResponse<Void> execute(EurekaHttpClient delegate) {
                    return delegate.cancel(appName, id);
                }
    
                @Override
                public RequestType getRequestType() {
                    return RequestType.Cancel;
                }
            });
        }
        .
        .
        .
    }

    여하튼  그 중 RetryableEurekaHttpClientexecute를 살펴보면

    RetryableEurekaHttpClient

    @Override
        protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
            List<EurekaEndpoint> candidateHosts = null;
            int endpointIdx = 0;
            for (int retry = 0; retry < numberOfRetries; retry++) {
                EurekaHttpClient currentHttpClient = delegate.get();
                EurekaEndpoint currentEndpoint = null;
                if (currentHttpClient == null) {
                    if (candidateHosts == null) {
                        candidateHosts = getHostCandidates();
                        if (candidateHosts.isEmpty()) {
                            throw new TransportException("There is no known eureka server; cluster server list is empty");
                        }
                    }
                    if (endpointIdx >= candidateHosts.size()) {
                        throw new TransportException("Cannot execute request on any known server");
                    }
    
                    currentEndpoint = candidateHosts.get(endpointIdx++);
                    currentHttpClient = clientFactory.newClient(currentEndpoint);
                }

    getHostCandidates(); 라는 메소드가 있다. 이 메소드의 결과는 현재 EurekaClient가 바라보는 EurekaServer에 대한 정보를 반환해준다.

    private List<EurekaEndpoint> getHostCandidates() {
            List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints();
            quarantineSet.retainAll(candidateHosts);
    
            // If enough hosts are bad, we have no choice but start over again
            int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage());
            //Prevent threshold is too large
            if (threshold > candidateHosts.size()) {
                threshold = candidateHosts.size();
            }
            if (quarantineSet.isEmpty()) {
                // no-op
            } else if (quarantineSet.size() >= threshold) {
                logger.debug("Clearing quarantined list of size {}", quarantineSet.size());
                quarantineSet.clear();
            } else {
                List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());
                for (EurekaEndpoint endpoint : candidateHosts) {
                    if (!quarantineSet.contains(endpoint)) {
                        remainingHosts.add(endpoint);
                    }
                }
                candidateHosts = remainingHosts;
            }
    
            return candidateHosts;
        }

    첫번째 줄이 중요하니 살펴보면 clusterResolver.getClusterEndpoints(); 를 호출 하고 있는데 Resolver라고하니 뭔가 익숙하다. 바로 위에서 보았던 scheduleServerEndpointTask 로 인해 생성된 AsyncResolver를 뜻한다.

     

    EurekaHttpClients.defaultBootstrapResolver()

            return new AsyncResolver<>(
                    EurekaClientNames.BOOTSTRAP,
                    delegateResolver,
                    initialValue,
                    1,
                    clientConfig.getEurekaServiceUrlPollIntervalSeconds() * 1000

    EurekaClientConfigBean

        /**
    	 * Indicates how often(in seconds) to poll for changes to eureka server information.
    	 * Eureka servers could be added or removed and this setting controls how soon the
    	 * eureka clients should know about it.
    	 */
    	private int eurekaServiceUrlPollIntervalSeconds = 5 * MINUTES;

    기본적으로 clientConfig로 설정한다면 5분으로 설정

    eureka.client.eureka-service-url-poll-interval-seconds 를 통해 변경할 수 있다.

    AsyncResolver

        AsyncResolver(String name,
                      ClusterResolver<T> delegate,
                      List<T> initialValue,
                      int executorThreadPoolSize,
                      int refreshIntervalMs,
                      int warmUpTimeoutMs) {
            this.name = name;
            this.delegate = delegate;
            this.refreshIntervalMs = refreshIntervalMs;
            this.warmUpTimeoutMs = warmUpTimeoutMs;
    
            this.executorService = Executors.newScheduledThreadPool(1,
                    new ThreadFactoryBuilder()
                            .setNameFormat("AsyncResolver-" + name + "-%d")
                            .setDaemon(true)
                            .build());
    
            this.threadPoolExecutor = new ThreadPoolExecutor(
                    1, executorThreadPoolSize, 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),  // use direct handoff
                    new ThreadFactoryBuilder()
                            .setNameFormat("AsyncResolver-" + name + "-executor-%d")
                            .setDaemon(true)
                            .build()
            );
    
            this.backgroundTask = new TimedSupervisorTask(
                    this.getClass().getSimpleName(),
                    executorService,
                    threadPoolExecutor,
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS,
                    5,
                    updateTask
            );
    
            this.resultsRef = new AtomicReference<>(initialValue);
            Monitors.registerObject(name, this);
        }

    생성자를 살펴보면 만들어질 당시 backgroundTask라는 녀석이 updateTaskrefreshIntervalMs (300000=5분)마다 실행하도록 되어있다.

    private final Runnable updateTask = new Runnable() {
            @Override
            public void run() {
                try {
                    List<T> newList = delegate.getClusterEndpoints();
                    if (newList != null) {
                        resultsRef.getAndSet(newList);
                        lastLoadTimestamp = System.currentTimeMillis();
                    } else {
                        logger.warn("Delegate returned null list of cluster endpoints");
                    }
                    logger.debug("Resolved to {}", newList);
                } catch (Exception e) {
                    logger.warn("Failed to retrieve cluster endpoints from the delegate", e);
                }
            }
        };

    Runnable interface를 구현한 녀석인데 안에 내용은 별것 없다 봐야할 것은 delegate.getClusterEndpoints()로 ServerEndpoints List를 구하고 멤버변수인 resultsRef에 Set하는 내용이다.

     

    그럼 다시 돌아가서

    List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints();

    clusterResolver가 AsyncResolver란 것을 알았으니 따라가보면

    @Override
        public List<T> getClusterEndpoints() {
            long delay = refreshIntervalMs;
            if (warmedUp.compareAndSet(false, true)) {
                if (!doWarmUp()) {
                    delay = 0;
                }
            }
            if (scheduled.compareAndSet(false, true)) {
                scheduleTask(delay);
            }
            return resultsRef.get();
        }

    warmedUp과 scheduled에 대한 내용도 있긴하지만 scheduled가 default로 false라는 것과 resultsRef.get()을 반환한다는 것만 보자.

    scheduleTask가 delay(5분)뒤에 동작하고, 현재 resultRef는 ServerEndpointList를 나타낸다.

    [AwsEndpoint{ serviceUrl='http://localhost:8761/eureka/', region='us-east-1', zone='defaultZone'}]

    resultRef는 위에서 scheduleServerEndpointTask 로 인해 AsyncResolver가 생성되기 전에 초기화된 값으로 ServerEndpointList를 구해놓았던 것이다.

    /* visible for testing */ void scheduleTask(long delay) {
            executorService.schedule(
                    backgroundTask, delay, TimeUnit.MILLISECONDS);
        }

    그리고 이 값은 scheduleTask에 의해 5분 뒤 새로 update가 될 것이다.

    위에서도 봤던 backgroundTask에 updateTask가 등록되어있는데

     private final Runnable updateTask = new Runnable() {
            @Override
            public void run() {
                try {
                    List<T> newList = delegate.getClusterEndpoints();
                    if (newList != null) {
                        resultsRef.getAndSet(newList);

    여기서 delegate는 ZoneAffinityClusterResolver 이다. 이 Resolver도 scheduleServerEndpointTask 가 호출될 때 생성되어진다.

    ZoneAffinityClusterResolver

    @Override
        public List<AwsEndpoint> getClusterEndpoints() {
            List<AwsEndpoint>[] parts = ResolverUtils.splitByZone(delegate.getClusterEndpoints(), myZone);
            List<AwsEndpoint> myZoneEndpoints = parts[0];
            List<AwsEndpoint> remainingEndpoints = parts[1];
            List<AwsEndpoint> randomizedList = randomizeAndMerge(myZoneEndpoints, remainingEndpoints);
            if (!zoneAffinity) {
                Collections.reverse(randomizedList);
            }
    
            logger.debug("Local zone={}; resolved to: {}", myZone, randomizedList);
    
            return randomizedList;
        }

    위의 getClusterEndpoints()를 살펴보면 또 delegate.getClusterEndpoints()를 호출하는 것을 볼 수 있다.

    이때 delegate는 ConfigClusterResolver 이다.

    ConfigClusterResolver 

    @Override
        public List<AwsEndpoint> getClusterEndpoints() {
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                if (logger.isInfoEnabled()) {
                    logger.info("Resolving eureka endpoints via DNS: {}", getDNSName());
                }
                return getClusterEndpointsFromDns();
            } else {
                logger.info("Resolving eureka endpoints via configuration");
                return getClusterEndpointsFromConfig();
            }
        }

    eureka.client.use-dns-for-fetching-service-urls의 값을 지정해놓지 않으면 default로 false이기 때문에 FromConfig를 통해서 생성하게된다

    최종적으로 이 녀석이 우리가 application.yml에 등록해놓은 config에 의해 ServerEndpoint인 EurekaEndpoint를 반환한다. 위의 AwsEndpoint는 DefaultEndpoint를 상속받고 DefaultEndpoint는 EurekaEndpoint를 구현한다.

    public class AwsEndpoint extends DefaultEndpoint {
    public class DefaultEndpoint implements EurekaEndpoint {

     

    private List<AwsEndpoint> getClusterEndpointsFromConfig() {
            String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
            String myZone = InstanceInfo.getZone(availZones, myInstanceInfo);
    
            Map<String, List<String>> serviceUrls = EndpointUtils
                    .getServiceUrlsMapFromConfig(clientConfig, myZone, clientConfig.shouldPreferSameZoneEureka());
    
            List<AwsEndpoint> endpoints = new ArrayList<>();
            for (String zone : serviceUrls.keySet()) {
                for (String url : serviceUrls.get(zone)) {
                    try {
                        endpoints.add(new AwsEndpoint(url, getRegion(), zone));
                    } catch (Exception ignore) {
                        logger.warn("Invalid eureka server URI: {}; removing from the server pool", url);
                    }
                }
            }
    
            logger.debug("Config resolved to {}", endpoints);
    
            if (endpoints.isEmpty()) {
                logger.error("Cannot resolve to any endpoints from provided configuration: {}", serviceUrls);
            }
    
            return endpoints;
        }

    getClugetClusterEndpointsFromConfig()를 살펴보면 clientConfig를 통해서 Zone, ServiceUrls등을 생성하는 것을 볼 수 있다.

    private final EurekaClientConfig clientConfig;
    
    public class DefaultEurekaClientConfig implements EurekaClientConfig {

    clientConfig는 EurekaClientConfig 인터페이스이다.

    EurekaClientAutoConfiguration에서

    @Bean
    @ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
    public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
        return new EurekaClientConfigBean();
    }

    eurekaClientConfigBean에 의해 생성되어서 주입된다.

    실제 eurekaClient 빈을 생성할 때 주입된다.

     

    이렇게 EurekaEndpoint List를 구했다.


    SessionedEurekaHttpClient -> RetryableEurekaHttpClient -> RedirectingEurekaHttpClient -> MetricsCollectingEurekaHttpClient -> JerseyApplicationClient

    길게 오긴했지만 다시 RetryableEurekaHttpClient로 돌아가서

                    currentEndpoint = candidateHosts.get(endpointIdx++);
                    currentHttpClient = clientFactory.newClient(currentEndpoint);
    		try {
                    EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
                    if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
                        delegate.set(currentHttpClient);
                        if (retry > 0) {
                            logger.info("Request execution succeeded on retry #{}", retry);
                        }
                        return response;
                    }
                    logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
                } catch (Exception e) {
                    logger.warn("Request execution failed with message: {}", e.getMessage());  // just log message as the underlying client should log the stacktrace
                }

    candidateHosts는 EurekaEndpointList라는것을 우리는 알고 있다.

    여기서 currentHttpClient는 http://localhost:8080/eurake를 default로 가르키고 있다.

    그래서 다음 requestExecutor인 RedirectingEurekaHttpClient의 execute를 호출

    쭉쭉 넘어가서 최종단계인

    SessionedEurekaHttpClient -> RetryableEurekaHttpClient -> RedirectingEurekaHttpClient -> MetricsCollectingEurekaHttpClient -> JerseyApplicationClient

    JerseyApplicationClient

    AbstractJerseyEurekaHttpClient

        @Override
        public EurekaHttpResponse<Applications> getApplications(String... regions) {
            return getApplicationsInternal("apps/", regions);
        }
    private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
            ClientResponse response = null;
            String regionsParamValue = null;
            try {
                WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
                if (regions != null && regions.length > 0) {
                    regionsParamValue = StringUtil.join(regions);
                    webResource = webResource.queryParam("regions", regionsParamValue);
                }
                Builder requestBuilder = webResource.getRequestBuilder();
                addExtraHeaders(requestBuilder);
                response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);
    
                Applications applications = null;
                if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
                    applications = response.getEntity(Applications.class);
                }
                return anEurekaHttpResponse(response.getStatus(), Applications.class)
                        .headers(headersOf(response))
                        .entity(applications)
                        .build();
            } finally {
                if (logger.isDebugEnabled()) {
                    logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
                            serviceUrl, urlPath,
                            regionsParamValue == null ? "" : "regions=" + regionsParamValue,
                            response == null ? "N/A" : response.getStatus()
                    );
                }
                if (response != null) {
                    response.close();
                }
            }
        }

    위의 메소드를 살펴보면 최종적으로 getApplicationsInternal에서 Jersey 라이브러리를 통해 Rest Api를 쏘아서 Eureka Endpoint에 등록된 Eureka Client(Applications) List를 가져오는 것이다.

    requestBuilder.accept를 통해 response를 받아서 applications를 확인해보면

    위와 같이 2개의 Eureka Client가 등록되어 있는 것을 확인할 수 있다.

     

    현재 프로젝트는 debug-svc:8080이고

    미리 등록해놓았던 2개의 application이 발견되었다. 실제 request를 날렸떤 uri로 요청을 해보면

    화면에서도 동일한 데이터를 확인할 수 있다.

     

    이로서 DiscoveryClient 생성자 안에서 fetchRegistry 단계가 끝이나서 EurekaServer에 등록되어있는 Applcations들을 찾을 수 있었다.

    if (apps == null) {
                logger.error("The application is null for some reason. Not storing this information");
            } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
                localRegionApps.set(this.filterAndShuffle(apps));
                logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
            } else {
                logger.warn("Not updating applications as another thread is updating it already");
            }

    applications 는 DiscoverClient의 localRegionApps에 set되게 된다!

    @Override
    public Applications getApplications() {
        return localRegionApps.get();
    }

    또한 DiscoverClient를 주입받아서 getApplications()로 지금까지 과정을 통해 생성된 applications들을 항상 만나볼 수 있다.

    EurekaDiscoveryClientConfiguration

    @Configuration(proxyBeanMethods = false)
    @EnableConfigurationProperties
    @ConditionalOnClass(EurekaClientConfig.class)
    @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
    @ConditionalOnDiscoveryEnabled
    @ConditionalOnBlockingDiscoveryEnabled
    public class EurekaDiscoveryClientConfiguration {
    
    	/**
    	 * Deprecated in favor of auto configuration order.
    	 * @return Marker bean
    	 * @deprecated in favor of auto configuration order.
    	 */
    	@Deprecated
    	@Bean
    	public Marker eurekaDiscoverClientMarker() {
    		return new Marker();
    	}
    
    	@Bean
    	@ConditionalOnMissingBean
    	public EurekaDiscoveryClient discoveryClient(EurekaClient client,
    			EurekaClientConfig clientConfig) {
    		return new EurekaDiscoveryClient(client, clientConfig);
    	}

    EurekaDiscoveryClientConfiguration안에 discoveryClient가 Bean으로 등록되어있고 반환되는 EurekaDiscoveryClient는 DiscoveryClient를 상속받고 있기 때문에 @Autowired생성자주입을 통해 언제든 우리는 접근할 수 있다.

     

    마지막으로 지속적으로 Eureka Endpoint로 요청을 하는 cacheRefresh, heartbeat 스케쥴러에 대해서는 2부에 작성하도록 하겠다.

     

    댓글

Designed by Tistory.