今日熱搜:4 個維度搞懂 Nacos 注冊中心
現(xiàn)如今市面上注冊中心的輪子很多,我實際使用過的就有三款:Eureka、Gsched、Nacos,由于當(dāng)前參與 Nacos 集群的維護和開發(fā)工作,期間也參與了 Nacos 社區(qū)的一些開發(fā)和 Bug Fix 工作,過程中對 Nacos 原理有了一定的積累,今天給大家分享一下 Nacos 動態(tài)服務(wù)發(fā)現(xiàn)的原理。
【資料圖】
不 BB,上文章目錄:
01 什么是動態(tài)服務(wù)發(fā)現(xiàn)?
服務(wù)發(fā)現(xiàn)是指使用一個注冊中心來記錄分布式系統(tǒng)中的全部服務(wù)的信息,以便其他服務(wù)能夠快速的找到這些已注冊的服務(wù)。
在單體應(yīng)用中,DNS+Nginx 可以滿足服務(wù)發(fā)現(xiàn)的要求,此時服務(wù)的IP列表配置在 nginx 上。在微服務(wù)架構(gòu)中,由于服務(wù)粒度變的更細,服務(wù)的上下線更加頻繁,我們需要一款注冊中心來動態(tài)感知服務(wù)的上下線,并且推送IP列表變化給服務(wù)消費者,架構(gòu)如下圖。
02 Nacos 實現(xiàn)動態(tài)服務(wù)發(fā)現(xiàn)的原理
Nacos實現(xiàn)動態(tài)服務(wù)發(fā)現(xiàn)的核心原理如下圖,我們接下來的內(nèi)容將圍繞這個圖來進行。
2.1 通訊協(xié)議
整個服務(wù)注冊與發(fā)現(xiàn)過程,都離不開通訊協(xié)議,在1.x的 Nacos 版本中服務(wù)端只支持 http 協(xié)議,后來為了提升性能在2.x版本引入了谷歌的 grpc,grpc 是一款長連接協(xié)議,極大的減少了 http 請求頻繁的連接創(chuàng)建和銷毀過程,能大幅度提升性能,節(jié)約資源。
據(jù)官方測試,Nacos服務(wù)端 grpc 版本,相比 http 版本的性能提升了9倍以上。
2.2 Nacos 服務(wù)注冊
簡單來講,服務(wù)注冊的目的就是客戶端將自己的ip端口等信息上報給 Nacos 服務(wù)端,過程如下:
創(chuàng)建長連接:Nacos SDK 通過Nacos服務(wù)端域名解析出服務(wù)端ip列表,選擇其中一個ip創(chuàng)建 grpc 連接,并定時檢查連接狀態(tài),當(dāng)連接斷開,則自動選擇服務(wù)端ip列表中的下一個ip進行重連。
健康檢查請求:在正式發(fā)起注冊之前,Nacos SDK 向服務(wù)端發(fā)送一個空請求,服務(wù)端回應(yīng)一個空請求,若Nacos SDK 未收到服務(wù)端回應(yīng),則認為服務(wù)端不健康,并進行一定次數(shù)重試,如果都未收到回應(yīng),則注冊失敗。
發(fā)起注冊:當(dāng)你查看Nacos java SDK的注冊方法時,你會發(fā)現(xiàn)沒有返回值,這是因為Nacos SDK做了補償機制,在真實給服務(wù)端上報數(shù)據(jù)之前,會先往緩存中插入一條記錄表示開始注冊,注冊成功之后再從緩存中標(biāo)記這條記錄為注冊成功,當(dāng)注冊失敗時,緩存中這條記錄是未注冊成功的狀態(tài),Nacos SDK開啟了一個定時任務(wù),定時查詢異常的緩存數(shù)據(jù),重新發(fā)起注冊。
Nacos SDK注冊失敗時的自動補償機制時序圖。
相關(guān)源碼如下:
@OverridepublicvoidregisterService(StringserviceName,StringgroupName,Instanceinstance)throwsNacosException{NAMING_LOGGER.info(\"[REGISTER-SERVICE]{}registeringservice{}withinstance{}\",namespaceId,serviceName,instance);//添加redo日志redoService.cacheInstanceForRedo(serviceName,groupName,instance);doRegisterService(serviceName,groupName,instance);}publicvoiddoRegisterService(StringserviceName,StringgroupName,Instanceinstance)throwsNacosException{//向服務(wù)端發(fā)起注冊InstanceRequestrequest=newInstanceRequest(namespaceId,serviceName,groupName,NamingRemoteConstants.REGISTER_INSTANCE,instance);requestToServer(request,Response.class);//標(biāo)記注冊成功redoService.instanceRegistered(serviceName,groupName);}
執(zhí)行補償定時任務(wù)RedoScheduledTask。
@Overridepublicvoidrun(){if(!redoService.isConnected()){LogUtils.NAMING_LOGGER.warn(\"GrpcConnectionisdisconnect,skipcurrentredotask\");return;}try{redoForInstances();redoForSubscribes();}catch(Exceptione){LogUtils.NAMING_LOGGER.warn(\"Redotaskrunwithunexpectedexception:\",e);}}privatevoidredoForInstances(){for(InstanceRedoDataeach:redoService.findInstanceRedoData()){try{redoForInstance(each);}catch(NacosExceptione){LogUtils.NAMING_LOGGER.error(\"Redoinstanceoperation{}for{}@@{}failed.\",each.getRedoType(),each.getGroupName(),each.getServiceName(),e);}}}
服務(wù)端數(shù)據(jù)同步(Distro協(xié)議):Nacos SDK只會與服務(wù)端某個節(jié)點建立長連接,當(dāng)服務(wù)端接受到客戶端注冊的實例數(shù)據(jù)后,還需要將實例數(shù)據(jù)同步給其他節(jié)點。Nacos自己實現(xiàn)了一個一致性協(xié)議名為Distro,服務(wù)注冊的時候會觸發(fā)Distro一次同步,每個Nacos節(jié)點之間會定時互相發(fā)送Distro數(shù)據(jù),以此保證數(shù)據(jù)最終一致。
服務(wù)實例上線推送:Nacos服務(wù)端收到服務(wù)實例數(shù)據(jù)后會將服務(wù)的最新實例列表通過grpc推送給該服務(wù)的所有訂閱者。
服務(wù)注冊過程源碼時序圖:整理了一下服務(wù)注冊過程整體時序圖,對源碼實現(xiàn)感興趣的可以按照根據(jù)這個時序圖view一下源碼。
2.3 Nacos 心跳機制
目前主流的注冊中心,比如Consul、Eureka、Zk包括我們公司自研的Gsched,都是通過心跳機制來感知服務(wù)的下線。Nacos也是通過心跳機制來實現(xiàn)的。
Nacos目前SDK維護了兩個分支的版本(1.x、2.x),這兩個版本心跳機制的實現(xiàn)不一樣。其中1.x版本的SDK通過http協(xié)議來定時向服務(wù)端發(fā)送心跳維持自己的健康狀態(tài),2.x版本的SDK則通過grpc自身的心跳機制來?;睿?dāng)Nacos服務(wù)端接受不到服務(wù)實例的心跳,會認為實例下線。如下圖:
grpc監(jiān)測到連接斷開事件,發(fā)送ClientDisconnectEvent。
publicclassConnectionBasedClientManagerextendsClientConnectionEventListenerimplementsClientManager{//連接斷開,發(fā)送連接斷開事件publicbooleanclientDisconnected(StringclientId){Loggers.SRV_LOG.info(\"Clientconnection{}disconnect,removeinstancesandsubscribers\",clientId);ConnectionBasedClientclient=clients.remove(clientId);if(null==client){returntrue;}client.release();NotifyCenter.publishEvent(newClientEvent.ClientDisconnectEvent(client));returntrue;}}
移除客戶端注冊的服務(wù)實例
publicclassClientServiceIndexesManagerextendsSmartSubscriber{@OverridepublicvoidonEvent(Eventevent){//接收失去連接事件if(eventinstanceofClientEvent.ClientDisconnectEvent){handleClientDisconnect((ClientEvent.ClientDisconnectEvent)event);}elseif(eventinstanceofClientOperationEvent){handleClientOperation((ClientOperationEvent)event);}}privatevoidhandleClientDisconnect(ClientEvent.ClientDisconnectEventevent){Clientclient=event.getClient();for(Serviceeach:client.getAllSubscribeService()){removeSubscriberIndexes(each,client.getClientId());}//移除客戶端注冊的服務(wù)實例for(Serviceeach:client.getAllPublishedService()){removePublisherIndexes(each,client.getClientId());}}//移除客戶端注冊的服務(wù)實例privatevoidremovePublisherIndexes(Serviceservice,StringclientId){if(!publisherIndexes.containsKey(service)){return;}publisherIndexes.get(service).remove(clientId);NotifyCenter.publishEvent(newServiceEvent.ServiceChangedEvent(service,true));}}
2.4 Nacos 服務(wù)訂閱
當(dāng)一個服務(wù)發(fā)生上下線,Nacos如何知道要推送給哪些客戶端?
Nacos SDK 提供了訂閱和取消訂閱方法,當(dāng)客戶端向服務(wù)端發(fā)起訂閱請求,服務(wù)端會記錄發(fā)起調(diào)用的客戶端為該服務(wù)的訂閱者,同時將服務(wù)的最新實例列表返回。當(dāng)客戶端發(fā)起了取消訂閱,服務(wù)端就會從該服務(wù)的訂閱者列表中把當(dāng)前客戶端移除。
當(dāng)客戶端發(fā)起訂閱時,服務(wù)端除了會同步返回最新的服務(wù)實例列表,還會異步的通過grpc推送給該訂閱者最新的服務(wù)實例列表,這樣做的目的是為了異步更新客戶端本地緩存的服務(wù)數(shù)據(jù)。
當(dāng)客戶端訂閱的服務(wù)上下線,該服務(wù)所有的訂閱者會立刻收到最新的服務(wù)列表并且將服務(wù)最新的實例數(shù)據(jù)更新到內(nèi)存。
我們也看一下相關(guān)源碼,服務(wù)端接收到訂閱數(shù)據(jù),首先保存到內(nèi)存中。
@OverridepublicvoidsubscribeService(Serviceservice,Subscribersubscriber,StringclientId){Servicesingleton=ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);Clientclient=clientManager.getClient(clientId);//校驗長連接是否正常if(!clientIsLegal(client,clientId)){return;}//保存訂閱數(shù)據(jù)client.addServiceSubscriber(singleton,subscriber);client.setLastUpdatedTime();//發(fā)送訂閱事件NotifyCenter.publishEvent(newClientOperationEvent.ClientSubscribeServiceEvent(singleton,clientId));}privatevoidhandleClientOperation(ClientOperationEventevent){Serviceservice=event.getService();StringclientId=event.getClientId();if(eventinstanceofClientOperationEvent.ClientRegisterServiceEvent){addPublisherIndexes(service,clientId);}elseif(eventinstanceofClientOperationEvent.ClientDeregisterServiceEvent){removePublisherIndexes(service,clientId);}elseif(eventinstanceofClientOperationEvent.ClientSubscribeServiceEvent){//處理訂閱操作addSubscriberIndexes(service,clientId);}elseif(eventinstanceofClientOperationEvent.ClientUnsubscribeServiceEvent){removeSubscriberIndexes(service,clientId);}}
然后發(fā)布訂閱事件。
privatevoidaddSubscriberIndexes(Serviceservice,StringclientId){//保存訂閱數(shù)據(jù)subscriberIndexes.computeIfAbsent(service,(key)->newConcurrentHashSet<>());//Fix#5404,Onlyfirsttimeaddneednotifyevent.if(subscriberIndexes.get(service).add(clientId)){//發(fā)布訂閱事件NotifyCenter.publishEvent(newServiceEvent.ServiceSubscribedEvent(service,clientId));}}
服務(wù)端自己消費訂閱事件,并且推送給訂閱的客戶端最新的服務(wù)實例數(shù)據(jù)。
@OverridepublicvoidonEvent(Eventevent){if(!upgradeJudgement.isUseGrpcFeatures()){return;}if(eventinstanceofServiceEvent.ServiceChangedEvent){//Ifservicechanged,pushtoallsubscribers.ServiceEvent.ServiceChangedEventserviceChangedEvent=(ServiceEvent.ServiceChangedEvent)event;Serviceservice=serviceChangedEvent.getService();delayTaskEngine.addTask(service,newPushDelayTask(service,PushConfig.getInstance().getPushTaskDelay()));}elseif(eventinstanceofServiceEvent.ServiceSubscribedEvent){//Ifserviceissubscribedbyoneclient,onlypushthisclient.ServiceEvent.ServiceSubscribedEventsubscribedEvent=(ServiceEvent.ServiceSubscribedEvent)event;Serviceservice=subscribedEvent.getService();delayTaskEngine.addTask(service,newPushDelayTask(service,PushConfig.getInstance().getPushTaskDelay(),subscribedEvent.getClientId()));}}
2.5 Nacos 推送
推送方式
前面說了服務(wù)的注冊和訂閱都會發(fā)生推送(服務(wù)端->客戶端),那推送到底是如何實現(xiàn)的呢?
在早期的Nacos版本,當(dāng)服務(wù)實例變化,服務(wù)端會通過udp協(xié)議將最新的數(shù)據(jù)發(fā)送給客戶端,后來發(fā)現(xiàn)udp推送有一定的丟包率,于是新版本的Nacos支持了grpc推送。Nacos服務(wù)端會自動判斷客戶端的版本來選擇哪種方式來進行推送,如果你使用1.4.2以前的SDK進行注冊,那Nacos服務(wù)端會使用udp協(xié)議來進行推送,反之則使用grpc。
推送失敗重試
當(dāng)發(fā)送推送時,客戶端可能正在重啟,或者連接不穩(wěn)定導(dǎo)致推送失敗,這個時候Nacos會進行重試。Nacos將每個推送都封裝成一個任務(wù)對象,放入到隊列中,再開啟一個線程不停的從隊列取出任務(wù)執(zhí)行,執(zhí)行之前會先刪除該任務(wù),如果執(zhí)行失敗則將任務(wù)重新添加到隊列,該線程會記錄任務(wù)執(zhí)行的時間,如果超過1秒,則會記錄到日志。
推送部分源碼
添加推送任務(wù)到執(zhí)行隊列中。
privatestaticclassPushDelayTaskProcessorimplementsNacosTaskProcessor{privatefinalPushDelayTaskExecuteEngineexecuteEngine;publicPushDelayTaskProcessor(PushDelayTaskExecuteEngineexecuteEngine){this.executeEngine=executeEngine;}@Overridepublicbooleanprocess(NacosTasktask){PushDelayTaskpushDelayTask=(PushDelayTask)task;Serviceservice=pushDelayTask.getService();NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask(service,newPushExecuteTask(service,executeEngine,pushDelayTask));returntrue;}}
推送任務(wù)PushExecuteTask 的執(zhí)行。
publicclassPushExecuteTaskextendsAbstractExecuteTask{//..省略@Overridepublicvoidrun(){try{//封裝要推送的服務(wù)實例數(shù)據(jù)PushDataWrapperwrapper=generatePushData();ClientManagerclientManager=delayTaskEngine.getClientManager();//如果是服務(wù)上下線導(dǎo)致的推送,獲取所有訂閱者//如果是訂閱導(dǎo)致的推送,獲取訂閱者for(Stringeach:getTargetClientIds()){Clientclient=clientManager.getClient(each);if(null==client){//meansthisclienthasdisconnectcontinue;}Subscribersubscriber=clientManager.getClient(each).getSubscriber(service);//推送給訂閱者delayTaskEngine.getPushExecutor().doPushWithCallback(each,subscriber,wrapper,newNamingPushCallback(each,subscriber,wrapper.getOriginalData(),delayTask.isPushToAll()));}}catch(Exceptione){Loggers.PUSH.error(\"Pushtaskforservice\"+service.getGroupedServiceName()+\"executefailed\",e);//當(dāng)推送發(fā)生異常,重新將推送任務(wù)放入執(zhí)行隊列delayTaskEngine.addTask(service,newPushDelayTask(service,1000L));}}//如果是服務(wù)上下線導(dǎo)致的推送,獲取所有訂閱者//如果是訂閱導(dǎo)致的推送,獲取訂閱者privateCollectiongetTargetClientIds(){returndelayTask.isPushToAll()?delayTaskEngine.getIndexesManager().getAllClientsSubscribeService(service):delayTask.getTargetClients();}
執(zhí)行推送任務(wù)線程InnerWorker 的執(zhí)行。
/***Innerexecuteworker.*/privateclassInnerWorkerextendsThread{InnerWorker(Stringname){setDaemon(false);setName(name);}@Overridepublicvoidrun(){while(!closed.get()){try{//從隊列中取出任務(wù)PushExecuteTaskRunnabletask=queue.take();longbegin=System.currentTimeMillis();//執(zhí)行PushExecuteTasktask.run();longduration=System.currentTimeMillis()-begin;if(duration>1000L){log.warn(\"task{}takes{}ms\",task,duration);}}catch(Throwablee){log.error(\"[TASK-FAILED]\"+e.toString(),e);}}}}
2.6 Nacos SDK 查詢服務(wù)實例
服務(wù)消費者首先需要調(diào)用Nacos SDK的接口來獲取最新的服務(wù)實例,然后才能從獲取到的實例列表中以加權(quán)輪詢的方式選擇出一個實例(包含ip,port等信息),最后再發(fā)起調(diào)用。
前面已經(jīng)提到Nacos服務(wù)發(fā)生上下線、訂閱的時候都會推送最新的服務(wù)實例列表到當(dāng)客戶端,客戶端再更新本地內(nèi)存中的緩沖數(shù)據(jù),所以調(diào)用Nacos SDK提供的查詢實例列表的接口時,不會直接請求服務(wù)端獲取數(shù)據(jù),而是會優(yōu)先使用內(nèi)存中的服務(wù)數(shù)據(jù),只有內(nèi)存中查不到的情況下才會發(fā)起訂閱請求服務(wù)端數(shù)據(jù)。
Nacos SDK內(nèi)存中的數(shù)據(jù)除了接受來自服務(wù)端的推送更新之外,自己本地也會有一個定時任務(wù)定時去獲取服務(wù)端數(shù)據(jù)來進行兜底。Nacos SDK在查詢的時候也了容災(zāi)機制,即從磁盤獲取服務(wù)數(shù)據(jù),而這個磁盤的數(shù)據(jù)其實也是來自于內(nèi)存,有一個定時任務(wù)定時從內(nèi)存緩存中獲取然后加載到磁盤。Nacos SDK的容災(zāi)機制默認關(guān)閉,可通過設(shè)置環(huán)境變量failover-mode=true來開啟。
架構(gòu)圖
用戶查詢流程
查詢服務(wù)實例部分源碼
privatefinalConcurrentMapserviceInfoMap;@OverridepublicListgetAllInstances(StringserviceName,StringgroupName,Listclusters,booleansubscribe)throwsNacosException{ServiceInfoserviceInfo;StringclusterString=StringUtils.join(clusters,\",\");//這里默認傳過來是trueif(subscribe){//從本地內(nèi)存獲取服務(wù)數(shù)據(jù),如果獲取不到則從磁盤獲取serviceInfo=serviceInfoHolder.getServiceInfo(serviceName,groupName,clusterString);if(null==serviceInfo||!clientProxy.isSubscribed(serviceName,groupName,clusterString)){//如果從本地獲取不到數(shù)據(jù),則調(diào)用訂閱方法serviceInfo=clientProxy.subscribe(serviceName,groupName,clusterString);}}else{//適用于不走訂閱,直接從服務(wù)端獲取數(shù)據(jù)的情況serviceInfo=clientProxy.queryInstancesOfService(serviceName,groupName,clusterString,0,false);}Listlist;if(serviceInfo==null||CollectionUtils.isEmpty(list=serviceInfo.getHosts())){returnnewArrayList();}returnlist;}}//從本地內(nèi)存獲取服務(wù)數(shù)據(jù),如果開啟了故障轉(zhuǎn)移則直接從磁盤獲取,因為當(dāng)服務(wù)端掛了,本地啟動時內(nèi)存中也沒有數(shù)據(jù)publicServiceInfogetServiceInfo(finalStringserviceName,finalStringgroupName,finalStringclusters){NAMING_LOGGER.debug(\"failover-mode:{}\",failoverReactor.isFailoverSwitch());StringgroupedServiceName=NamingUtils.getGroupedName(serviceName,groupName);Stringkey=ServiceInfo.getKey(groupedServiceName,clusters);//故障轉(zhuǎn)移則直接從磁盤獲取if(failoverReactor.isFailoverSwitch()){returnfailoverReactor.getService(key);}//返回內(nèi)存中數(shù)據(jù)returnserviceInfoMap.get(key);}
3. 結(jié)語
本篇文章向大家介紹 Nacos 服務(wù)發(fā)現(xiàn)的基本概念和核心能力以及實現(xiàn)的原理,旨在讓大家對 Nacos 的服務(wù)注冊與發(fā)現(xiàn)功能有更多的了解,做到心中有數(shù)。
這篇文章原作者是我好友,小米大佬胡俊,如果對 Nacos 開源感興趣的同學(xué),也可以和我聯(lián)系。
- 今日熱搜:4 個維度搞懂 Nacos
- 環(huán)球關(guān)注:云南:民警進校普法 回
- 全球今熱點:桑植縣:新技術(shù)應(yīng)用
- 環(huán)球微資訊!烈日灼人下迅雷下載_烈
- 【世界播資訊】今年青海扎實推進關(guān)
- 當(dāng)前報道:變“碳”為寶 長慶油田全
- 陜西3項考古發(fā)現(xiàn)進入全國十大考古新
- 全球熱消息:海南4市縣公開省級環(huán)保
- 天天關(guān)注:3月1日又有冷空氣南下
- 【速看料】河北大名發(fā)現(xiàn)一處兩晉時
- 當(dāng)前短訊!基金早班車|等待破局!幻
- 每日信息:省際高速公路建設(shè)平穩(wěn)高
- 一個靈魂的自述之活著為了什么
- 哈雷彗星周期為多少_哈雷彗星周期
- 摩根士丹利的Wilson預(yù)計美股3月面臨
- 全球微速訊:Lightroom Classic 2
- 世界視訊!萬全區(qū)宣平堡小學(xué)志愿服
- 當(dāng)前關(guān)注:壩陵河大橋:夕照“飛虹
- 全球今日訊!盤點統(tǒng)治過NBA的八大巨
- 新蒲新區(qū):消費市場里的“春日序曲
- 天天視訊!探訪云南優(yōu)質(zhì)馬鈴薯育種
- 精靈寶可夢大搜索:白金_關(guān)于精靈寶
- 滾動:松桃旅游市場全面回暖
- 環(huán)球觀焦點:幸福成都 美在文明|看
- 【焦點熱聞】你笑起來很好看歌詞含
- 未來十天,上海氣溫呈上升趨勢
- 環(huán)球今日訊!欽州向海經(jīng)濟生產(chǎn)總值1
- 最新消息:2023年2月24日白銀價格多
- 世界信息:讓雷鋒精神照耀城市每個角
- 世界快報:2022年陜西鞏固拓展脫貧攻