@@ -16,14 +16,14 @@ use super::cluster::model::{
1616use super :: cluster:: node_manage:: { InnerNodeManage , NodeManageRequest } ;
1717use super :: filter:: InstanceFilterUtils ;
1818use super :: listener:: { InnerNamingListener , ListenerItem , NamingListenerCmd } ;
19- use super :: model:: InstanceKey ;
2019use super :: model:: InstanceShortKey ;
2120use super :: model:: InstanceUpdateTag ;
2221use super :: model:: ServiceDetailDto ;
2322use super :: model:: ServiceInfo ;
2423use super :: model:: ServiceKey ;
2524use super :: model:: UpdateInstanceType ;
2625use super :: model:: { DistroData , Instance } ;
26+ use super :: model:: { InstanceKey , UpdatePerpetualType } ;
2727use super :: naming_delay_nofity:: DelayNotifyActor ;
2828use super :: naming_delay_nofity:: DelayNotifyCmd ;
2929use super :: naming_subscriber:: NamingListenerItem ;
@@ -67,8 +67,11 @@ use crate::naming::model::actor_model::{
6767 SnapshotLoadRequest ,
6868} ;
6969use crate :: naming:: sniffing:: { NetSniffing , NetSniffingCmd } ;
70+ use crate :: raft:: cluster:: route:: RaftRequestRoute ;
7071use crate :: raft:: filestore:: model:: SnapshotRecordDto ;
7172use crate :: raft:: filestore:: raftapply:: { RaftApplyDataRequest , RaftApplyDataResponse } ;
73+ use crate :: raft:: network:: core:: RaftRouter ;
74+ use crate :: raft:: store:: { ClientRequest , ClientResponse } ;
7275use actix:: prelude:: * ;
7376use quick_protobuf:: { BytesReader , Writer } ;
7477use regex:: Regex ;
@@ -97,6 +100,7 @@ pub struct NamingActor {
97100 pub ( crate ) net_sniffing_addr : Option < Addr < NetSniffing > > ,
98101 pub ( crate ) last_perpetual_instance_probe_time : i32 ,
99102 //dal_addr: Addr<ServiceDalActor>,
103+ pub ( crate ) raft_router : Option < Arc < RaftRequestRoute > > ,
100104}
101105
102106impl Actor for NamingActor {
@@ -134,10 +138,13 @@ impl Inject for NamingActor {
134138 self . node_id = sys_config. raft_node_id ;
135139 log:: info!( "NamingActor change naming timeout info from env,health_timeout:{},instance_timeout:{}"
136140 , self . sys_config. instance_health_timeout_millis, self . sys_config. instance_timeout_millis) ;
137- self . sys_config . perpetual_instance_probe_interval =
138- sys_config. naming_perpetual_instance_probe_interval ;
141+ if sys_config. naming_perpetual_instance_probe_interval > 0 {
142+ self . sys_config . perpetual_instance_probe_interval =
143+ sys_config. naming_perpetual_instance_probe_interval ;
144+ self . net_sniffing_addr = factory_data. get_actor ( ) ;
145+ }
139146 }
140- self . net_sniffing_addr = factory_data. get_actor ( ) ;
147+ self . raft_router = factory_data. get_bean ( ) ;
141148 self . instance_time_out_heartbeat ( ctx) ;
142149 log:: info!( "NamingActor inject complete" ) ;
143150 }
@@ -172,6 +179,7 @@ impl NamingActor {
172179 disable_notify : false ,
173180 net_sniffing_addr : None ,
174181 last_perpetual_instance_probe_time : 0 ,
182+ raft_router : None ,
175183 }
176184 }
177185
@@ -360,13 +368,14 @@ impl NamingActor {
360368 key : & ServiceKey ,
361369 instance_id : & InstanceShortKey ,
362370 client_id : Option < & Arc < String > > ,
363- ) -> UpdateInstanceType {
371+ ) -> ( UpdateInstanceType , UpdatePerpetualType ) {
364372 let service = if let Some ( service) = self . service_map . get_mut ( key) {
365373 service
366374 } else {
367- return UpdateInstanceType :: None ;
375+ return ( UpdateInstanceType :: None , UpdatePerpetualType :: None ) ;
368376 } ;
369377 let mut real_client_id = None ;
378+ let mut perpetual_tag = UpdatePerpetualType :: None ;
370379 let old_instance = service. remove_instance ( instance_id, client_id) ;
371380 let now = now_millis ( ) ;
372381 let tag = if let Some ( old_instance) = & old_instance {
@@ -380,6 +389,9 @@ impl NamingActor {
380389 instance_key,
381390 ) ;
382391 }
392+ if !old_instance. ephemeral {
393+ perpetual_tag = UpdatePerpetualType :: Remove ;
394+ }
383395 UpdateInstanceType :: Remove
384396 } else {
385397 UpdateInstanceType :: None
@@ -397,7 +409,7 @@ impl NamingActor {
397409 self . remove_client_instance_key ( & client_id, & instance_key) ;
398410 }
399411 }
400- tag
412+ ( tag, perpetual_tag )
401413 }
402414
403415 pub fn update_instance (
@@ -406,6 +418,7 @@ impl NamingActor {
406418 mut instance : Instance ,
407419 tag : Option < InstanceUpdateTag > ,
408420 from_sync : bool ,
421+ self_addr : Option < Addr < Self > > ,
409422 ) -> UpdateInstanceType {
410423 instance. init ( ) ;
411424 //assert!(instance.check_valid());
@@ -441,14 +454,32 @@ impl NamingActor {
441454 }
442455 let instance_short_key = instance. get_short_key ( ) ;
443456
444- let ( tag, replace_old_client_id) = service. update_instance ( instance, tag, from_sync) ;
457+ let ( tag, replace_old_client_id, perpetua_type) =
458+ service. update_instance ( instance, tag, from_sync) ;
445459 #[ cfg( feature = "debug" ) ]
446460 log:: info!(
447461 "update_instance tag:{:?},key:{:?},replace_old_client_id:{:?}" ,
448462 & tag,
449463 instance_key,
450464 & replace_old_client_id
451465 ) ;
466+ if !from_sync {
467+ if let Some ( self_addr) = self_addr {
468+ match perpetua_type {
469+ UpdatePerpetualType :: New | UpdatePerpetualType :: Update => {
470+ let instance = service. get_instance ( & instance_short_key) ;
471+ if let Some ( instance) = instance. clone ( ) {
472+ self_addr. do_send ( NamingCmd :: NotifyUpdateRaftInstance ( instance) ) ;
473+ }
474+ }
475+ UpdatePerpetualType :: Remove => {
476+ self_addr
477+ . do_send ( NamingCmd :: NotifyRemoveRaftInstance ( instance_key. clone ( ) ) ) ;
478+ }
479+ UpdatePerpetualType :: None => { }
480+ }
481+ }
482+ }
452483 if let UpdateInstanceType :: UpdateOtherClusterMetaData ( _, _) = & tag {
453484 return tag;
454485 }
@@ -1052,7 +1083,7 @@ impl NamingActor {
10521083 self . update_service ( service_detail) ;
10531084 }
10541085 for mut instance in snapshot. instances {
1055- self . update_instance ( & instance. get_service_key ( ) , instance, None , true ) ;
1086+ self . update_instance ( & instance. get_service_key ( ) , instance, None , true , None ) ;
10561087 }
10571088 }
10581089
@@ -1125,7 +1156,7 @@ impl NamingActor {
11251156 let instance: Instance = param. into ( ) ;
11261157 let service_key = instance. get_service_key ( ) ;
11271158 let short_key = instance. get_short_key ( ) ;
1128- self . update_instance ( & service_key, instance, None , false ) ;
1159+ self . update_instance ( & service_key, instance, None , true , None ) ;
11291160 if let Some ( instance) = self . get_instance ( & service_key, & short_key) {
11301161 Ok ( NamingRaftResult :: InstanceInfo ( instance) )
11311162 } else {
@@ -1140,7 +1171,7 @@ impl NamingActor {
11401171 if !param. ephemeral {
11411172 let instance: Instance = param. into ( ) ;
11421173 let service_key = instance. get_service_key ( ) ;
1143- self . update_instance ( & service_key, instance, None , false ) ;
1174+ self . update_instance ( & service_key, instance, None , true , None ) ;
11441175 Ok ( NamingRaftResult :: None )
11451176 } else {
11461177 Ok ( NamingRaftResult :: None )
@@ -1195,7 +1226,7 @@ impl NamingActor {
11951226 let instance_do: InstanceDo = reader. read_message ( & record. value ) ?;
11961227 let instance = Instance :: from_do ( instance_do) ;
11971228 let service_key = instance. get_service_key ( ) ;
1198- self . update_instance ( & service_key, instance, None , true ) ;
1229+ self . update_instance ( & service_key, instance, None , true , None ) ;
11991230 }
12001231 Ok ( ( ) )
12011232 }
@@ -1204,6 +1235,65 @@ impl NamingActor {
12041235 fn load_completed ( & mut self ) -> anyhow:: Result < ( ) > {
12051236 Ok ( ( ) )
12061237 }
1238+
1239+ fn update_instance_to_raft ( & mut self , instance : Arc < Instance > , ctx : & mut Context < Self > ) {
1240+ let raft_router = self . raft_router . clone ( ) ;
1241+ let req = NamingRaftReq :: UpdateInstance {
1242+ param : instance. as_ref ( ) . into ( ) ,
1243+ } ;
1244+ Self :: raft_request_with_retry ( req, raft_router, 2 )
1245+ . into_actor ( self )
1246+ . map ( |res, _, _| { } )
1247+ . spawn ( ctx) ;
1248+ }
1249+
1250+ fn remove_instance_to_raft ( & mut self , instance_key : InstanceKey , ctx : & mut Context < Self > ) {
1251+ let raft_router = self . raft_router . clone ( ) ;
1252+ let req = NamingRaftReq :: RemoveInstance ( instance_key) ;
1253+ Self :: raft_request_with_retry ( req, raft_router, 2 )
1254+ . into_actor ( self )
1255+ . map ( |res, _, _| { } )
1256+ . spawn ( ctx) ;
1257+ }
1258+
1259+ pub ( crate ) async fn raft_request_with_retry (
1260+ req : NamingRaftReq ,
1261+ raft_router : Option < Arc < RaftRequestRoute > > ,
1262+ retry : i32 ,
1263+ ) -> anyhow:: Result < NamingRaftResult > {
1264+ if raft_router. is_none ( ) {
1265+ return Err ( anyhow:: anyhow!( "raft_router is None" ) ) ;
1266+ }
1267+ let mut i = 0 ;
1268+ while i < retry {
1269+ let res = Self :: raft_request ( req. clone ( ) , raft_router. clone ( ) ) . await ;
1270+ if res. is_ok ( ) {
1271+ return res;
1272+ }
1273+ i += 1 ;
1274+ tokio:: time:: sleep ( Duration :: from_secs ( 3 ) ) . await ;
1275+ }
1276+ Self :: raft_request ( req, raft_router) . await
1277+ }
1278+
1279+ pub ( crate ) async fn raft_request (
1280+ req : NamingRaftReq ,
1281+ raft_router : Option < Arc < RaftRequestRoute > > ,
1282+ ) -> anyhow:: Result < NamingRaftResult > {
1283+ let raft_router = if let Some ( r) = raft_router {
1284+ r
1285+ } else {
1286+ return Err ( anyhow:: anyhow!( "raft_router is None" ) ) ;
1287+ } ;
1288+ let res = raft_router
1289+ . request ( ClientRequest :: NamingReq { req } )
1290+ . await ?;
1291+ if let ClientResponse :: NamingResp { resp } = res {
1292+ Ok ( resp)
1293+ } else {
1294+ Err ( anyhow:: anyhow!( "raft_request error" ) )
1295+ }
1296+ }
12071297}
12081298
12091299#[ derive( Debug , Message ) ]
@@ -1254,6 +1344,8 @@ pub enum NamingCmd {
12541344 service_keys : Vec < ServiceKey > ,
12551345 success : bool ,
12561346 } ,
1347+ NotifyUpdateRaftInstance ( Arc < Instance > ) ,
1348+ NotifyRemoveRaftInstance ( InstanceKey ) ,
12571349}
12581350
12591351pub enum NamingResult {
@@ -1289,15 +1381,22 @@ impl Handler<NamingCmd> for NamingActor {
12891381 //log::info!("NamingActor handle:{:?}", &msg);
12901382 match msg {
12911383 NamingCmd :: Update ( instance, tag) => {
1292- let tag = self . update_instance ( & instance. get_service_key ( ) , instance, tag, false ) ;
1384+ let tag = self . update_instance (
1385+ & instance. get_service_key ( ) ,
1386+ instance,
1387+ tag,
1388+ false ,
1389+ Some ( ctx. address ( ) ) ,
1390+ ) ;
12931391 if let UpdateInstanceType :: UpdateOtherClusterMetaData ( node_id, instance) = tag {
12941392 Ok ( NamingResult :: RewriteToCluster ( node_id, instance) )
12951393 } else {
12961394 Ok ( NamingResult :: NULL )
12971395 }
12981396 }
12991397 NamingCmd :: UpdateFromSync ( instance, tag) => {
1300- let tag = self . update_instance ( & instance. get_service_key ( ) , instance, tag, true ) ;
1398+ let tag =
1399+ self . update_instance ( & instance. get_service_key ( ) , instance, tag, true , None ) ;
13011400 if let UpdateInstanceType :: UpdateOtherClusterMetaData ( node_id, instance) = tag {
13021401 Ok ( NamingResult :: RewriteToCluster ( node_id, instance) )
13031402 } else {
@@ -1306,16 +1405,20 @@ impl Handler<NamingCmd> for NamingActor {
13061405 }
13071406 NamingCmd :: UpdateBatch ( instances) => {
13081407 for mut instance in instances {
1309- self . update_instance ( & instance. get_service_key ( ) , instance, None , true ) ;
1408+ self . update_instance ( & instance. get_service_key ( ) , instance, None , true , None ) ;
13101409 }
13111410 Ok ( NamingResult :: NULL )
13121411 }
13131412 NamingCmd :: Delete ( instance) => {
1314- self . remove_instance (
1413+ let ( _ , perpetual_tag ) = self . remove_instance (
13151414 & instance. get_service_key ( ) ,
13161415 & instance. get_short_key ( ) ,
13171416 Some ( & instance. client_id ) ,
13181417 ) ;
1418+ if let UpdatePerpetualType :: Remove = perpetual_tag {
1419+ let instance_key = instance. get_instance_key ( ) ;
1420+ self . remove_instance_to_raft ( instance_key, ctx) ;
1421+ }
13191422 Ok ( NamingResult :: NULL )
13201423 }
13211424 NamingCmd :: DeleteBatch ( instances) => {
@@ -1517,6 +1620,14 @@ impl Handler<NamingCmd> for NamingActor {
15171620 self . update_perpetual_health ( host, service_keys, success) ;
15181621 Ok ( NamingResult :: NULL )
15191622 }
1623+ NamingCmd :: NotifyUpdateRaftInstance ( instance) => {
1624+ self . update_instance_to_raft ( instance, ctx) ;
1625+ Ok ( NamingResult :: NULL )
1626+ }
1627+ NamingCmd :: NotifyRemoveRaftInstance ( instance_key) => {
1628+ self . remove_instance_to_raft ( instance_key, ctx) ;
1629+ Ok ( NamingResult :: NULL )
1630+ }
15201631 }
15211632 }
15221633}
@@ -1561,7 +1672,7 @@ async fn query_healthy_instances() {
15611672 instance. cluster_name = "DEFUALT" . to_owned ( ) ;
15621673 instance. init ( ) ;
15631674 let key = instance. get_service_key ( ) ;
1564- naming. update_instance ( & key, instance, None , false ) ;
1675+ naming. update_instance ( & key, instance, None , false , None ) ;
15651676 if let Some ( service) = naming. service_map . get_mut ( & key) {
15661677 service. protect_threshold = 0.1 ;
15671678 }
@@ -1625,7 +1736,7 @@ fn test_remove_has_instance_service() {
16251736 instance. cluster_name = "DEFUALT" . to_owned ( ) ;
16261737 instance. init ( ) ;
16271738 let service_key = instance. get_service_key ( ) ;
1628- naming. update_instance ( & service_key, instance. clone ( ) , None , false ) ;
1739+ naming. update_instance ( & service_key, instance. clone ( ) , None , false , None ) ;
16291740 let service_info = ServiceDetailDto {
16301741 namespace_id : service_key. namespace_id . clone ( ) ,
16311742 service_name : service_key. service_name . clone ( ) ,
0 commit comments