SpringCloud中服务注册与发现Eureka以及注册源码的示例分析(eureka,springcloud,开发技术)

时间:2024-05-06 20:06:06 作者 : 石家庄SEO 分类 : 开发技术
  • TAG :

服务注册与发现

SpringCloud中服务注册与发现Eureka以及注册源码的示例分析

关系:

1.服务提供者在启动时,向注册中心注册自己提供的服务。

2.服务消费者在启动时,向注册中心订阅自己所需的服务。

3.注册中心返回服务提供者地址给消费者。

4.服务消费者从提供者地址中调用消费者。

Eureka介绍:

Spring Cloud Eureka 是Spring Cloud Netflix 微服务套件中的一部分, 它基于Netflix Eureka 做了二次封装, 主要负责完成微服务架构中的服务治理功能。Spring Cloud 通过为Eureka 增加了Spring Boot 风格的自动化配置,我们只需通过简单引入依赖和注解配置就能让Spring Boot 构建的微服务应用轻松地与Eureka 服务治理体系进行整合。

Netflix Eureka

Spring Cloud Eureka, 使用Netflix Eureka来实现服务注册与发现, 它既包含了服务端组件,也包含了客户端组件,并且服务端与客户端均采用Java编写,所以Eureka主要适用于通过Java实现的分布式系统,或是与NM兼容语言构建的系统。但是, 由于Eureka服务端的服务治理机制提供了完备的RESTfulAPL所以它也支持将非Java语言构建的微服务应用纳入Eureka的服务治理体系中来。只是在使用其他语言平台的时候,需要自己来实现Eureka的客户端程序。不过庆幸的是,在目前几个较为流行的开发平台上,都已经有了一些针对Eureka 注册中心的客户端实现框架, 比如.NET平台的Steeltoe、Node.js 的eureka-js-client等。

Eureka服务端, 我们也称为服务注册中心。它同其他服务注册中心一样,支持高可用配置。它依托于强一致性提供良好的服务实例可用性, 可以应对多种不同的故障场景。如果Eureka以集群模式部署,当集群中有分片出现故障时,那么Eureka就转入自我保护模式。它允许在分片故障期间继续提供服务的发现和注册,当故障分片恢复运行时, 集群中的其他分片会把它们的状态再次同步回来。以在AWS 上的实践为例, Netflix推荐每个可用的区域运行一个Eureka服务端,通过它来形成集群。不同可用区域的服务注册中心通过异步模式互相复制各自的状态,这意味着在任意给定的时间点每个实例关于所有服务的状态是有细微差别的。

Eureka客户端,主要处理服务的注册与发现。客户端服务通过注解和参数配置的方式,嵌入在客户端应用程序的代码中, 在应用程序运行时,Eureka客户端向注册中心注册自身提供的服务并周期性地发送心跳来更新它的服务租约。同时,它也能从服务端查询当前注册的服务信息并把它们缓存到本地并周期性地刷新服务状态。

Eureka环境搭建

一、服务端搭建

1.新建项目

SpringCloud中服务注册与发现Eureka以及注册源码的示例分析

SpringCloud中服务注册与发现Eureka以及注册源码的示例分析

SpringCloud中服务注册与发现Eureka以及注册源码的示例分析

2.配置

#server(eureka默认端口为:8761)server.port=8761#springspring.application.name=spring-cloud-server#eureka#是否注册到eurekaeureka.client.register-with-eureka=false#是否从eureka获取注册信息eureka.client.fetch-registry=false#eureka服务器的地址(注意:地址最后面的/eureka/这个是固定值)eureka.client.serviceUrl.defaultZone=http://localhost:${server.port}/eureka/
@EnableEurekaServer@SpringBootApplicationpublicclassCloudTestEurekaServerApplication{ publicstaticvoidmain(String[]args){ SpringApplication.run(CloudTestEurekaServerApplication.class,args); }}

3.启动服务

SpringCloud中服务注册与发现Eureka以及注册源码的示例分析

二、客户端(provider)搭建

1.创建项目

SpringCloud中服务注册与发现Eureka以及注册源码的示例分析

SpringCloud中服务注册与发现Eureka以及注册源码的示例分析

2.配置

#serverserver.port=7777#springspring.application.name=spring-cloud-provider#eurekaeureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/
@EnableEurekaClient@SpringBootApplicationpublicclassCloudTestEurekaProviderApplication{ publicstaticvoidmain(String[]args){ SpringApplication.run(CloudTestEurekaProviderApplication.class,args); }}

3.启动服务

SpringCloud中服务注册与发现Eureka以及注册源码的示例分析

三、消费者(consumer)搭建

SpringCloud中服务注册与发现Eureka以及注册源码的示例分析

SpringCloud中服务注册与发现Eureka以及注册源码的示例分析

自我保护

当我们在本地调试基于Eureka的程序时, 基本上都会碰到这样一个问题, 在服务注册中心的信息面板中出现类似下面的红色警告信息:

EMERGENCY! EUREKA MAY BE INCORRECTLY CLAIMING INSTANCES ARE UP WHEN THEY'RE NOT.RENEWALS ARE LESSER TH邸THRESHOLD AND HENCE THE INSTANCES ARE NOT BEING EXPI邸D JUST TO BE SAFE.

实际上, 该警告就是触发了EurekaServer的自我保护机制。之前我们介绍过, 服务注册到EurekaServer之后,会维护一个心跳连接,告诉EurekaServer自己还活着。EurekaServer在运行期间,会统计心跳失败的比例在15分钟之内是否低于85%, 如果出现低于的情况(在单机调试的时候很容易满足, 实际在生产环境上通常是由于网络不稳定导致), EurekaServer会将当前的实例注册信息保护起来, 让这些实例不会过期, 尽可能保护这些注册信息。但是, 在这段保护期间内实例若出现问题, 那么客户端很容易拿到实际已经不存在的服务实例, 会出现调用失败的清况, 所以客户端必须要有容错机制, 比如可以使用请求重试、断路器等机制。

Euraka服务启动过程源码解析

  1. 启动日志如下

[main]o.s.j.e.a.AnnotationMBeanExporter:Locatedmanagedbean'environmentManager':registeringwithJMXserverasMBean[org.springframework.cloud.context.environment:name=environmentManager,type=EnvironmentManager][main]o.s.j.e.a.AnnotationMBeanExporter:Locatedmanagedbean'serviceRegistryEndpoint':registeringwithJMXserverasMBean[org.springframework.cloud.client.serviceregistry.endpoint:name=serviceRegistryEndpoint,type=ServiceRegistryEndpoint][main]o.s.j.e.a.AnnotationMBeanExporter:Locatedmanagedbean'refreshScope':registeringwithJMXserverasMBean[org.springframework.cloud.context.scope.refresh:name=refreshScope,type=RefreshScope][main]o.s.j.e.a.AnnotationMBeanExporter:Locatedmanagedbean'configurationPropertiesRebinder':registeringwithJMXserverasMBean[org.springframework.cloud.context.properties:name=configurationPropertiesRebinder,context=ab7395e,type=ConfigurationPropertiesRebinder][main]o.s.j.e.a.AnnotationMBeanExporter:Locatedmanagedbean'refreshEndpoint':registeringwithJMXserverasMBean[org.springframework.cloud.endpoint:name=refreshEndpoint,type=RefreshEndpoint][main]o.s.c.support.DefaultLifecycleProcessor:Startingbeansinphase0[main]o.s.c.n.e.s.EurekaServiceRegistry:Registeringapplicationspring-cloud-serverwitheurekawithstatusUP[Thread-11]o.s.c.n.e.server.EurekaServerBootstrap:Settingtheeurekaconfiguration..[Thread-11]o.s.c.n.e.server.EurekaServerBootstrap:Eurekadatacentervalueeureka.datacenterisnotset,defaultingtodefault[Thread-11]o.s.c.n.e.server.EurekaServerBootstrap:Eurekaenvironmentvalueeureka.environmentisnotset,defaultingtotest[Thread-11]o.s.c.n.e.server.EurekaServerBootstrap:isAwsreturnedfalse[Thread-11]o.s.c.n.e.server.EurekaServerBootstrap:Initializedservercontext[Thread-11]c.n.e.r.PeerAwareInstanceRegistryImpl:Got1instancesfromneighboringDSnode[Thread-11]c.n.e.r.PeerAwareInstanceRegistryImpl:Renewthresholdis:1[Thread-11]c.n.e.r.PeerAwareInstanceRegistryImpl:ChangingstatustoUP[Thread-11]e.s.EurekaServerInitializerConfiguration:StartedEurekaServer[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatstartedonport(s):8761(http)[main].s.c.n.e.s.EurekaAutoServiceRegistration:Updatingportto8761[main]c.m.e.CloudTestEurekaServerApplication:StartedCloudTestEurekaServerApplicationin4.076seconds(JVMrunningfor4.54)[nio-8761-exec-1]o.a.c.c.C.[Tomcat].[localhost].[/]:InitializingSpringFrameworkServlet'dispatcherServlet'[nio-8761-exec-1]o.s.web.servlet.DispatcherServlet:FrameworkServlet'dispatcherServlet':initializationstarted[nio-8761-exec-1]o.s.web.servlet.DispatcherServlet:FrameworkServlet'dispatcherServlet':initializationcompletedin11ms[a-EvictionTimer]c.n.e.registry.AbstractInstanceRegistry:RunningtheevicttaskwithcompensationTime0ms

2.可以看到在"Started Eureka Server"这一行,发现执行了类EurekaServerInitializerConfiguration,所以它是程序入口,进入:

@ConfigurationpublicclassEurekaServerInitializerConfiguration implementsServletContextAware,SmartLifecycle,Ordered{@Override publicvoidstart(){ newThread(newRunnable(){ @Override publicvoidrun(){ try{ //TODO:isthisclassevenneedednow? eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext); log.info("StartedEurekaServer"); publish(newEurekaRegistryAvailableEvent(getEurekaServerConfig())); EurekaServerInitializerConfiguration.this.running=true; publish(newEurekaServerStartedEvent(getEurekaServerConfig())); } catch(Exceptionex){ //Help! log.error("CouldnotinitializeEurekaservletcontext",ex); } } }).start(); }

3.可以发现这个类上面有注解@Configuration,说明这个类可以被spring容器感知到,然后实例化,并且会执行start()方法,开启一个线程执行功能;然后再进入contextInitialied方法:

publicclassEurekaServerBootstrap{publicvoidcontextInitialized(ServletContextcontext){ try{ initEurekaEnvironment(); initEurekaServerContext(); context.setAttribute(EurekaServerContext.class.getName(),this.serverContext); } catch(Throwablee){ log.error("Cannotbootstrapeurekaserver:",e); thrownewRuntimeException("Cannotbootstrapeurekaserver:",e); } }

4.可发现上面方法主要有两个功能:环境初始化和服务初始化,这里只看服务初始化,进入initEurekaServerContext()方法,可以看到下面代码:

publicclassEurekaServerBootstrap{protectedvoidinitEurekaServerContext()throwsException{ //Forbackwardcompatibility JsonXStream.getInstance().registerConverter(newV1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); XmlXStream.getInstance().registerConverter(newV1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); if(isAws(this.applicationInfoManager.getInfo())){ this.awsBinder=newAwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig,this.registry,this.applicationInfoManager); this.awsBinder.start(); } EurekaServerContextHolder.initialize(this.serverContext); log.info("Initializedservercontext"); //Copyregistryfromneighboringeurekanode intregistryCount=this.registry.syncUp(); this.registry.openForTraffic(this.applicationInfoManager,registryCount); //Registerallmonitoringstatistics. EurekaMonitors.registerAllStats(); }

5.上面代码首先初始化server上下文,然后再去注册。可以看到先获得变量registryCount(注册表),然后通过调用openForTraffic方法,为注册监测数据做准备,或者可以这样说(检测监测的数据是否存活,如果不存活,做剔除操作),下面是函数一步一步进入的情况:

SpringCloud中服务注册与发现Eureka以及注册源码的示例分析

publicclassInstanceRegistryextendsPeerAwareInstanceRegistryImpl implementsApplicationContextAware{@Override publicvoidopenForTraffic(ApplicationInfoManagerapplicationInfoManager,intcount){ super.openForTraffic(applicationInfoManager, count==0?this.defaultOpenForTrafficCount:count); }
@OverridepublicvoidopenForTraffic(ApplicationInfoManagerapplicationInfoManager,intcount){//Renewalshappenevery30secondsandforaminuteitshouldbeafactorof2.this.expectedNumberOfRenewsPerMin=count*2;this.numberOfRenewsPerMinThreshold=(int)(this.expectedNumberOfRenewsPerMin*serverConfig.getRenewalPercentThreshold());logger.info("Got"+count+"instancesfromneighboringDSnode");logger.info("Renewthresholdis:"+numberOfRenewsPerMinThreshold);this.startupTime=System.currentTimeMillis();if(count>0){this.peerInstancesTransferEmptyOnStartup=false;}DataCenterInfo.NameselfName=applicationInfoManager.getInfo().getDataCenterInfo().getName();booleanisAws=Name.Amazon==selfName;if(isAws&&serverConfig.shouldPrimeAwsReplicaConnections()){logger.info("PrimingAWSconnectionsforallreplicas..");primeAwsReplicas(applicationInfoManager);}logger.info("ChangingstatustoUP");applicationInfoManager.setInstanceStatus(InstanceStatus.UP);super.postInit();}

进入postInit()方法:

protectedvoidpostInit(){renewsLastMin.start();if(evictionTaskRef.get()!=null){evictionTaskRef.get().cancel();}evictionTaskRef.set(newEvictionTask());evictionTimer.schedule(evictionTaskRef.get(),serverConfig.getEvictionIntervalTimerInMs(),serverConfig.getEvictionIntervalTimerInMs());}

之前在日志中有一句:"Running the evict task with compensationTime 0ms",这句话就是做节点剔除操作,就是在EvictionTask()方法执行的。

失效剔除

有些时候, 我们的服务实例并不一定会正常下线, 可能由于内存溢出、网络故障等原因使得服务不能正常工作, 而服务注册中心并未收到“服务下线” 的请求。为了从服务列表中将这些无法提供服务的实例剔除, Eureka Server在启动的时候会创建一个定时任务,默认每隔一段时间(默认为60秒) 将当前清单中超时(默认为90秒)没有续约的服务剔除出去。

classEvictionTaskextendsTimerTask{privatefinalAtomicLonglastExecutionNanosRef=newAtomicLong(0l);@Overridepublicvoidrun(){try{longcompensationTimeMs=getCompensationTimeMs();logger.info("RunningtheevicttaskwithcompensationTime{}ms",compensationTimeMs);evict(compensationTimeMs);}catch(Throwablee){logger.error("Couldnotruntheevicttask",e);}}/***computeacompensationtimedefinedastheactualtimethistaskwasexecutedsincethepreviteration,*vstheconfiguredamountoftimeforexecution.Thisisusefulforcaseswherechangesintime(dueto*clockskeworgcforexample)causestheactualevictiontasktoexecutelaterthanthedesiredtime*accordingtotheconfiguredcycle.*/longgetCompensationTimeMs(){longcurrNanos=getCurrentTimeNano();longlastNanos=lastExecutionNanosRef.getAndSet(currNanos);if(lastNanos==0l){return0l;}longelapsedMs=TimeUnit.NANOSECONDS.toMillis(currNanos-lastNanos);longcompensationTime=elapsedMs-serverConfig.getEvictionIntervalTimerInMs();returncompensationTime<=0l?0l:compensationTime;}longgetCurrentTimeNano(){//fortestingreturnSystem.nanoTime();}}

Eureka客户端注册过程

SpringCloud中服务注册与发现Eureka以及注册源码的示例分析

  1. 入口:DiscoveryClient

首先上日志信息:

[main]com.netflix.discovery.DiscoveryClient:InitializingEurekainregionus-east-1[main]com.netflix.discovery.DiscoveryClient:Clientconfiguredtoneitherregisternorqueryfordata.[main]com.netflix.discovery.DiscoveryClient:DiscoveryClientinitializedattimestamp1517475139464withinitialinstancescount:0[main]c.n.eureka.DefaultEurekaServerContext:Initializing...

进入DiscoveryClient,首先看它的构造函数,里面执行initScheduledTasks()方法进行注册

@InjectDiscoveryClient(ApplicationInfoManagerapplicationInfoManager,EurekaClientConfigconfig,AbstractDiscoveryClientOptionalArgsargs,Provider<BackupRegistry>backupRegistryProvider){if(args!=null){this.healthCheckHandlerProvider=args.healthCheckHandlerProvider;this.healthCheckCallbackProvider=args.healthCheckCallbackProvider;this.eventListeners.addAll(args.getEventListeners());this.preRegistrationHandler=args.preRegistrationHandler;}else{this.healthCheckCallbackProvider=null;this.healthCheckHandlerProvider=null;this.preRegistrationHandler=null;}this.applicationInfoManager=applicationInfoManager;InstanceInfomyInfo=applicationInfoManager.getInfo();clientConfig=config;staticClientConfig=clientConfig;transportConfig=config.getTransportConfig();instanceInfo=myInfo;if(myInfo!=null){appPathIdentifier=instanceInfo.getAppName()+"/"+instanceInfo.getId();}else{logger.warn("SettinginstanceInfotoapassedinnullvalue");}this.backupRegistryProvider=backupRegistryProvider;this.urlRandomizer=newEndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);localRegionApps.set(newApplications());fetchRegistryGeneration=newAtomicLong(0);remoteRegionsToFetch=newAtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());remoteRegionsRef=newAtomicReference<>(remoteRegionsToFetch.get()==null?null:remoteRegionsToFetch.get().split(","));if(config.shouldFetchRegistry()){this.registryStalenessMonitor=newThresholdLevelsMetric(this,METRIC_REGISTRY_PREFIX+"lastUpdateSec_",newlong[]{15L,30L,60L,120L,240L,480L});}else{this.registryStalenessMonitor=ThresholdLevelsMetric.NO_OP_METRIC;}if(config.shouldRegisterWithEureka()){this.heartbeatStalenessMonitor=newThresholdLevelsMetric(this,METRIC_REGISTRATION_PREFIX+"lastHeartbeatSec_",newlong[]{15L,30L,60L,120L,240L,480L});}else{this.heartbeatStalenessMonitor=ThresholdLevelsMetric.NO_OP_METRIC;}logger.info("InitializingEurekainregion{}",clientConfig.getRegion());if(!config.shouldRegisterWithEureka()&&!config.shouldFetchRegistry()){logger.info("Clientconfiguredtoneitherregisternorqueryfordata.");scheduler=null;heartbeatExecutor=null;cacheRefreshExecutor=null;eurekaTransport=null;instanceRegionChecker=newInstanceRegionChecker(newPropertyBasedAzToRegionMapper(config),clientConfig.getRegion());//ThisisabitofhacktoallowforexistingcodeusingDiscoveryManager.getInstance()//toworkwithDI'dDiscoveryClientDiscoveryManager.getInstance().setDiscoveryClient(this);DiscoveryManager.getInstance().setEurekaClientConfig(config);initTimestampMs=System.currentTimeMillis();logger.info("DiscoveryClientinitializedattimestamp{}withinitialinstancescount:{}",initTimestampMs,this.getApplications().size());return;//noneedtosetupupannetworktasksandwearedone}try{//defaultsizeof2-1eachforheartbeatandcacheRefreshscheduler=Executors.newScheduledThreadPool(2,newThreadFactoryBuilder().setNameFormat("DiscoveryClient-%d").setDaemon(true).build());heartbeatExecutor=newThreadPoolExecutor(1,clientConfig.getHeartbeatExecutorThreadPoolSize(),0,TimeUnit.SECONDS,newSynchronousQueue<Runnable>(),newThreadFactoryBuilder().setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build());//usedirecthandoffcacheRefreshExecutor=newThreadPoolExecutor(1,clientConfig.getCacheRefreshExecutorThreadPoolSize(),0,TimeUnit.SECONDS,newSynchronousQueue<Runnable>(),newThreadFactoryBuilder().setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build());//usedirecthandoffeurekaTransport=newEurekaTransport();scheduleServerEndpointTask(eurekaTransport,args);AzToRegionMapperazToRegionMapper;if(clientConfig.shouldUseDnsForFetchingServiceUrls()){azToRegionMapper=newDNSBasedAzToRegionMapper(clientConfig);}else{azToRegionMapper=newPropertyBasedAzToRegionMapper(clientConfig);}if(null!=remoteRegionsToFetch.get()){azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));}instanceRegionChecker=newInstanceRegionChecker(azToRegionMapper,clientConfig.getRegion());}catch(Throwablee){thrownewRuntimeException("FailedtoinitializeDiscoveryClient!",e);}if(clientConfig.shouldFetchRegistry()&&!fetchRegistry(false)){fetchRegistryFromBackup();}//callandexecutethepreregistrationhandlerbeforeallbackgroundtasks(incregistration)isstartedif(this.preRegistrationHandler!=null){this.preRegistrationHandler.beforeRegistration();}initScheduledTasks();try{Monitors.registerObject(this);}catch(Throwablee){logger.warn("Cannotregistertimers",e);}//ThisisabitofhacktoallowforexistingcodeusingDiscoveryManager.getInstance()//toworkwithDI'dDiscoveryClientDiscoveryManager.getInstance().setDiscoveryClient(this);DiscoveryManager.getInstance().setEurekaClientConfig(config);initTimestampMs=System.currentTimeMillis();logger.info("DiscoveryClientinitializedattimestamp{}withinitialinstancescount:{}",initTimestampMs,this.getApplications().size());}

2.进入initScheduledTasks()方法,里面有两个大if代码块,"clientConfig.shouldRegisterWithEureka()"是核心逻辑,用于向Eureka注册

/***Initializesallscheduledtasks.*/privatevoidinitScheduledTasks(){if(clientConfig.shouldFetchRegistry()){//registrycacherefreshtimerintregistryFetchIntervalSeconds=clientConfig.getRegistryFetchIntervalSeconds();intexpBackOffBound=clientConfig.getCacheRefreshExecutorExponentialBackOffBound();scheduler.schedule(newTimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,newCacheRefreshThread()),registryFetchIntervalSeconds,TimeUnit.SECONDS);}if(clientConfig.shouldRegisterWithEureka()){intrenewalIntervalInSecs=instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();intexpBackOffBound=clientConfig.getHeartbeatExecutorExponentialBackOffBound();logger.info("Startingheartbeatexecutor:"+"renewintervalis:"+renewalIntervalInSecs);//Heartbeattimerscheduler.schedule(newTimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,newHeartbeatThread()),renewalIntervalInSecs,TimeUnit.SECONDS);//InstanceInforeplicatorinstanceInfoReplicator=newInstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2);//burstSizestatusChangeListener=newApplicationInfoManager.StatusChangeListener(){@OverridepublicStringgetId(){return"statusChangeListener";}@Overridepublicvoidnotify(StatusChangeEventstatusChangeEvent){if(InstanceStatus.DOWN==statusChangeEvent.getStatus()||InstanceStatus.DOWN==statusChangeEvent.getPreviousStatus()){//logatwarnlevelifDOWNwasinvolvedlogger.warn("Sawlocalstatuschangeevent{}",statusChangeEvent);}else{logger.info("Sawlocalstatuschangeevent{}",statusChangeEvent);}instanceInfoReplicator.onDemandUpdate();}};if(clientConfig.shouldOnDemandUpdateStatusChange()){applicationInfoManager.registerStatusChangeListener(statusChangeListener);}instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());}else{logger.info("NotregisteringwithEurekaserverperconfiguration");}}

3.上面代码执行了onDemandUpdate()方法,进入可见:

publicbooleanonDemandUpdate(){if(rateLimiter.acquire(burstSize,allowedRatePerMinute)){scheduler.submit(newRunnable(){@Overridepublicvoidrun(){logger.debug("Executingon-demandupdateoflocalInstanceInfo");FuturelatestPeriodic=scheduledPeriodicRef.get();if(latestPeriodic!=null&&!latestPeriodic.isDone()){logger.debug("Cancelingthelatestscheduledupdate,itwillberescheduledattheendofondemandupdate");latestPeriodic.cancel(false);}InstanceInfoReplicator.this.run();}});returntrue;}else{logger.warn("IgnoringonDemandupdateduetoratelimiter");returnfalse;}}publicvoidrun(){try{discoveryClient.refreshInstanceInfo();LongdirtyTimestamp=instanceInfo.isDirtyWithTime();if(dirtyTimestamp!=null){discoveryClient.register();instanceInfo.unsetIsDirty(dirtyTimestamp);}}catch(Throwablet){logger.warn("Therewasaproblemwiththeinstanceinforeplicator",t);}finally{Futurenext=scheduler.schedule(this,replicationIntervalSeconds,TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}}

4.可以看见,执行了register()方法进行注册,进入:

booleanregister()throwsThrowable{logger.info(PREFIX+appPathIdentifier+":registeringservice...");EurekaHttpResponse<Void>httpResponse;try{httpResponse=eurekaTransport.registrationClient.register(instanceInfo);}catch(Exceptione){logger.warn("{}-registrationfailed{}",PREFIX+appPathIdentifier,e.getMessage(),e);throwe;}if(logger.isInfoEnabled()){logger.info("{}-registrationstatus:{}",PREFIX+appPathIdentifier,httpResponse.getStatusCode());}returnhttpResponse.getStatusCode()==204;}

5.最后函数返回204,所以当注册状态为204,即为注册成功。现在客户端注册成功了,就应该到了服务端接收注册的过程:

Eureka接收注册的过程

SpringCloud中服务注册与发现Eureka以及注册源码的示例分析

  1. 入口ApplicationResource

SpringCloud中服务注册与发现Eureka以及注册源码的示例分析

@Produces({"application/xml","application/json"})publicclassApplicationResource{/***Registersinformationaboutaparticularinstanceforan*{@linkcom.netflix.discovery.shared.Application}.**@paraminfo*{@linkInstanceInfo}informationoftheinstance.*@paramisReplication*aheaderparametercontaininginformationwhetherthisis*replicatedfromothernodes.*/@POST@Consumes({"application/json","application/xml"})publicResponseaddInstance(InstanceInfoinfo,@HeaderParam(PeerEurekaNode.HEADER_REPLICATION)StringisReplication){logger.debug("Registeringinstance{}(replication={})",info.getId(),isReplication);//validatethattheinstanceinfocontainsallthenecessaryrequiredfieldsif(isBlank(info.getId())){returnResponse.status(400).entity("MissinginstanceId").build();}elseif(isBlank(info.getHostName())){returnResponse.status(400).entity("Missinghostname").build();}elseif(isBlank(info.getAppName())){returnResponse.status(400).entity("MissingappName").build();}elseif(!appName.equals(info.getAppName())){returnResponse.status(400).entity("MismatchedappName,expecting"+appName+"butwas"+info.getAppName()).build();}elseif(info.getDataCenterInfo()==null){returnResponse.status(400).entity("MissingdataCenterInfo").build();}elseif(info.getDataCenterInfo().getName()==null){returnResponse.status(400).entity("MissingdataCenterInfoName").build();}//handlecaseswhereclientsmayberegisteringwithbadDataCenterInfowithmissingdataDataCenterInfodataCenterInfo=info.getDataCenterInfo();if(dataCenterInfoinstanceofUniqueIdentifier){StringdataCenterInfoId=((UniqueIdentifier)dataCenterInfo).getId();if(isBlank(dataCenterInfoId)){booleanexperimental="true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));if(experimental){Stringentity="DataCenterInfooftype"+dataCenterInfo.getClass()+"mustcontainavalidid";returnResponse.status(400).entity(entity).build();}elseif(dataCenterInfoinstanceofAmazonInfo){AmazonInfoamazonInfo=(AmazonInfo)dataCenterInfo;StringeffectiveId=amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);if(effectiveId==null){amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(),info.getId());}}else{logger.warn("RegisteringDataCenterInfooftype{}withoutanappropriateid",dataCenterInfo.getClass());}}}registry.register(info,"true".equals(isReplication));returnResponse.status(204).build();//204tobebackwardscompatible}

2.可以看到,最终也是返回204;同时一步一步进入register方法,看是如何完成注册的。

@Override publicvoidregister(finalInstanceInfoinfo,finalbooleanisReplication){ handleRegistration(info,resolveInstanceLeaseDuration(info),isReplication); super.register(info,isReplication); }
/***Registerstheinformationaboutthe{@linkInstanceInfo}andreplicates*thisinformationtoallpeereurekanodes.Ifthisisreplicationevent*fromotherreplicanodesthenitisnotreplicated.**@paraminfo*the{@linkInstanceInfo}toberegisteredandreplicated.*@paramisReplication*trueifthisisareplicationeventfromotherreplicanodes,*falseotherwise.*/@Overridepublicvoidregister(finalInstanceInfoinfo,finalbooleanisReplication){intleaseDuration=Lease.DEFAULT_DURATION_IN_SECS;if(info.getLeaseInfo()!=null&&info.getLeaseInfo().getDurationInSecs()>0){leaseDuration=info.getLeaseInfo().getDurationInSecs();}super.register(info,leaseDuration,isReplication);replicateToPeers(Action.Register,info.getAppName(),info.getId(),info,null,isReplication);}
/***Registersanewinstancewithagivenduration.**@seecom.netflix.eureka.lease.LeaseManager#register(java.lang.Object,int,boolean)*/publicvoidregister(InstanceInforegistrant,intleaseDuration,booleanisReplication){try{read.lock();Map<String,Lease<InstanceInfo>>gMap=registry.get(registrant.getAppName());REGISTER.increment(isReplication);if(gMap==null){finalConcurrentHashMap<String,Lease<InstanceInfo>>gNewMap=newConcurrentHashMap<String,Lease<InstanceInfo>>();gMap=registry.putIfAbsent(registrant.getAppName(),gNewMap);if(gMap==null){gMap=gNewMap;}}Lease<InstanceInfo>existingLease=gMap.get(registrant.getId());//Retainthelastdirtytimestampwithoutoverwritingit,ifthereisalreadyaleaseif(existingLease!=null&&(existingLease.getHolder()!=null)){LongexistingLastDirtyTimestamp=existingLease.getHolder().getLastDirtyTimestamp();LongregistrationLastDirtyTimestamp=registrant.getLastDirtyTimestamp();logger.debug("Existingleasefound(existing={},provided={}",existingLastDirtyTimestamp,registrationLastDirtyTimestamp);if(existingLastDirtyTimestamp>registrationLastDirtyTimestamp){logger.warn("Thereisanexistingleaseandtheexistinglease'sdirtytimestamp{}isgreater"+"thantheonethatisbeingregistered{}",existingLastDirtyTimestamp,registrationLastDirtyTimestamp);logger.warn("UsingtheexistinginstanceInfoinsteadofthenewinstanceInfoastheregistrant");registrant=existingLease.getHolder();}}else{//Theleasedoesnotexistandhenceitisanewregistrationsynchronized(lock){if(this.expectedNumberOfRenewsPerMin>0){//Sincetheclientwantstocancelit,reducethethreshold//(1//for30seconds,2foraminute)this.expectedNumberOfRenewsPerMin=this.expectedNumberOfRenewsPerMin+2;this.numberOfRenewsPerMinThreshold=(int)(this.expectedNumberOfRenewsPerMin*serverConfig.getRenewalPercentThreshold());}}logger.debug("Nopreviousleaseinformationfound;itisnewregistration");}Lease<InstanceInfo>lease=newLease<InstanceInfo>(registrant,leaseDuration);if(existingLease!=null){lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());}gMap.put(registrant.getId(),lease);synchronized(recentRegisteredQueue){recentRegisteredQueue.add(newPair<Long,String>(System.currentTimeMillis(),registrant.getAppName()+"("+registrant.getId()+")"));}//Thisiswheretheinitialstatetransferofoverriddenstatushappensif(!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())){logger.debug("Foundoverriddenstatus{}forinstance{}.Checkingtoseeifneedstobeaddtothe"+"overrides",registrant.getOverriddenStatus(),registrant.getId());if(!overriddenInstanceStatusMap.containsKey(registrant.getId())){logger.info("Notfoundoverriddenid{}andhenceaddingit",registrant.getId());overriddenInstanceStatusMap.put(registrant.getId(),registrant.getOverriddenStatus());}}InstanceStatusoverriddenStatusFromMap=overriddenInstanceStatusMap.get(registrant.getId());if(overriddenStatusFromMap!=null){logger.info("Storingoverriddenstatus{}frommap",overriddenStatusFromMap);registrant.setOverriddenStatus(overriddenStatusFromMap);}//SetthestatusbasedontheoverriddenstatusrulesInstanceStatusoverriddenInstanceStatus=getOverriddenInstanceStatus(registrant,existingLease,isReplication);registrant.setStatusWithoutDirty(overriddenInstanceStatus);//IftheleaseisregisteredwithUPstatus,setleaseserviceuptimestampif(InstanceStatus.UP.equals(registrant.getStatus())){lease.serviceUp();}registrant.setActionType(ActionType.ADDED);recentlyChangedQueue.add(newRecentlyChangedItem(lease));registrant.setLastUpdatedTimestamp();invalidateCache(registrant.getAppName(),registrant.getVIPAddress(),registrant.getSecureVipAddress());logger.info("Registeredinstance{}/{}withstatus{}(replication={})",registrant.getAppName(),registrant.getId(),registrant.getStatus(),isReplication);}finally{read.unlock();}}

由于Eureka并没有数据库,索引通过map放在数据库中。

 </div> <div class="zixun-tj-product adv-bottom"></div> </div> </div> <div class="prve-next-news">
本文:SpringCloud中服务注册与发现Eureka以及注册源码的示例分析的详细内容,希望对您有所帮助,信息来源于网络。
上一篇:Redis分布式缓存安装和使用下一篇:

26 人围观 / 0 条评论 ↓快速评论↓

(必须)

(必须,保密)

阿狸1 阿狸2 阿狸3 阿狸4 阿狸5 阿狸6 阿狸7 阿狸8 阿狸9 阿狸10 阿狸11 阿狸12 阿狸13 阿狸14 阿狸15 阿狸16 阿狸17 阿狸18