@@ -33,6 +33,7 @@ use databend_common_meta_sled_store::openraft::error::Unreachable;
3333use databend_common_meta_sled_store:: openraft:: network:: RPCOption ;
3434use databend_common_meta_sled_store:: openraft:: MessageSummary ;
3535use databend_common_meta_sled_store:: openraft:: RaftNetworkFactory ;
36+ use databend_common_meta_types:: protobuf:: RaftReply ;
3637use databend_common_meta_types:: protobuf:: RaftRequest ;
3738use databend_common_meta_types:: protobuf:: SnapshotChunkRequest ;
3839use databend_common_meta_types:: AppendEntriesRequest ;
@@ -173,15 +174,12 @@ pub struct NetworkConnection {
173174 target : NodeId ,
174175
175176 /// The node info to send message to.
177+ ///
178+ /// This is not used, because meta-service does not store node info in membership.
176179 target_node : MembershipNode ,
177180
178- /// A counter to send snapshot via v0 API.
179- ///
180- /// v0 API should only be used during upgrading a meta cluster.
181- /// During this period, i.e., this counter is `>0`,
182- /// try to send via v0 if the remote is not upgraded.
183- /// When this counter reaches 0, start sending via v1 API.
184- install_snapshot_via_v0 : u64 ,
181+ /// The endpoint of the target node.
182+ endpoint : Endpoint ,
185183
186184 sto : RaftStore ,
187185
@@ -193,7 +191,7 @@ pub struct NetworkConnection {
193191impl NetworkConnection {
194192 #[ logcall:: logcall( err = "debug" ) ]
195193 #[ minitrace:: trace]
196- pub async fn make_client ( & self ) -> Result < ( RaftClient , Endpoint ) , Unreachable > {
194+ pub async fn make_client ( & mut self ) -> Result < RaftClient , Unreachable > {
197195 let target = self . target ;
198196
199197 let endpoint = self
@@ -215,7 +213,9 @@ impl NetworkConnection {
215213 let client = RaftClientApi :: new ( target, endpoint. clone ( ) , channel) ;
216214 debug ! ( "connected: target={}: {}" , target, addr) ;
217215
218- Ok ( ( client, endpoint) )
216+ self . endpoint = endpoint;
217+
218+ Ok ( client)
219219 }
220220 Err ( err) => {
221221 raft_metrics:: network:: incr_connect_failure ( & target, & endpoint. to_string ( ) ) ;
@@ -296,22 +296,41 @@ impl NetworkConnection {
296296 policy. chain ( zero)
297297 }
298298
299- /// Convert gRPC status to `RPCError`
300- fn status_to_unreachable < E > (
299+ fn parse_grpc_resp < R , E > (
301300 & self ,
302- status : tonic:: Status ,
303- endpoint : Endpoint ,
304- ) -> RPCError < RaftError < E > >
301+ grpc_res : Result < tonic:: Response < RaftReply > , tonic:: Status > ,
302+ ) -> Result < R , RPCError < RaftError < E > > >
305303 where
304+ R : serde:: de:: DeserializeOwned + ' static ,
305+ E : serde:: de:: DeserializeOwned + ' static ,
306306 E : std:: error:: Error ,
307307 {
308+ // Return status error
309+ let resp = grpc_res. map_err ( |e| self . status_to_unreachable ( e) ) ?;
310+
311+ // Parse serialized response into `Result<RaftReply.data, RaftReply.error>`
312+ let raft_res = GrpcHelper :: parse_raft_reply :: < R , E > ( resp) . map_err ( |serde_err| {
313+ new_net_err ( & serde_err, || {
314+ let t = std:: any:: type_name :: < R > ( ) ;
315+ format ! ( "parse reply for {}" , t)
316+ } )
317+ } ) ?;
318+
319+ // Wrap RaftError with RPCError
320+ raft_res. map_err ( |e| self . to_rpc_err ( e) )
321+ }
322+
323+ /// Convert gRPC status to `RPCError`
324+ fn status_to_unreachable < E > ( & self , status : tonic:: Status ) -> RPCError < RaftError < E > >
325+ where E : std:: error:: Error {
308326 warn ! (
309327 "target={}, endpoint={} gRPC error: {:?}" ,
310- self . target, endpoint, status
328+ self . target, self . endpoint, status
311329 ) ;
312330
313331 let any_err = AnyError :: new ( & status)
314- . add_context ( || format ! ( "gRPC target={}, endpoint={}" , self . target, endpoint) ) ;
332+ . add_context ( || format ! ( "gRPC target={}, endpoint={}" , self . target, self . endpoint) ) ;
333+
315334 RPCError :: Unreachable ( Unreachable :: new ( & any_err) )
316335 }
317336}
@@ -331,7 +350,7 @@ impl RaftNetwork<TypeConfig> for NetworkConnection {
331350 "send_append_entries" ,
332351 ) ;
333352
334- let ( mut client, endpoint ) = self . make_client ( ) . await ?;
353+ let mut client = self . make_client ( ) . await ?;
335354
336355 let raft_req = self . new_append_entries_raft_req ( & rpc) ?;
337356 let req = GrpcHelper :: traced_req ( raft_req) ;
@@ -348,12 +367,7 @@ impl RaftNetwork<TypeConfig> for NetworkConnection {
348367 self . target, grpc_res
349368 ) ;
350369
351- let resp = grpc_res. map_err ( |e| self . status_to_unreachable ( e, endpoint) ) ?;
352-
353- let raft_res = GrpcHelper :: parse_raft_reply ( resp)
354- . map_err ( |serde_err| new_net_err ( & serde_err, || "parse append_entries reply" ) ) ?;
355-
356- raft_res. map_err ( |e| self . to_rpc_err ( e) )
370+ self . parse_grpc_resp ( grpc_res)
357371 }
358372
359373 #[ logcall:: logcall( err = "debug" ) ]
@@ -390,68 +404,28 @@ impl RaftNetwork<TypeConfig> for NetworkConnection {
390404
391405 let _g = snapshot_send_inflight ( self . target ) . counter_guard ( ) ;
392406
393- let ( mut client, endpoint ) = self . make_client ( ) . await ?;
407+ let mut client = self . make_client ( ) . await ?;
394408
395409 let bytes = rpc. data . len ( ) as u64 ;
396410 raft_metrics:: network:: incr_sendto_bytes ( & self . target , bytes) ;
397411
398- // Try send via `v1` API, if the remote peer does not provide `v1` API,
399- // revert to `v0` API.
400- let v1_res = if self . install_snapshot_via_v0 == 0 {
401- // Send via v1 API
402-
403- let v1_req = SnapshotChunkRequest :: new_v1 ( rpc. clone ( ) ) ;
404- let req = databend_common_tracing:: inject_span_to_tonic_request ( v1_req) ;
405- let res = client
406- . install_snapshot_v1 ( req)
407- . timed ( observe_snapshot_send_spent ( self . target ) )
408- . await ;
409-
410- if is_unimplemented ( & res) {
411- warn ! (
412- "target={} does not support install_snapshot_v1 API, fallback to v0 API for next 10 times" ,
413- self . target
414- ) ;
415- self . install_snapshot_via_v0 = 10 ;
416- None
417- } else {
418- Some ( res)
419- }
420- } else {
421- None
422- } ;
412+ let v1_req = SnapshotChunkRequest :: new_v1 ( rpc) ;
413+ let req = databend_common_tracing:: inject_span_to_tonic_request ( v1_req) ;
423414
424- let grpc_res = if let Some ( v1_res) = v1_res {
425- v1_res
426- } else {
427- // Via v1 API is not tried or failed,
428- // Send via v0 API
429-
430- self . install_snapshot_via_v0 -= 1 ;
431-
432- let req = databend_common_tracing:: inject_span_to_tonic_request ( rpc. clone ( ) ) ;
433- client
434- . install_snapshot ( req)
435- . timed ( observe_snapshot_send_spent ( self . target ) )
436- . await
437- } ;
415+ let grpc_res = client
416+ . install_snapshot_v1 ( req)
417+ . timed ( observe_snapshot_send_spent ( self . target ) )
418+ . await ;
438419
439420 info ! (
440421 "install_snapshot resp target={}: {:?}" ,
441422 self . target, grpc_res
442423 ) ;
443424
444- let resp = grpc_res. map_err ( |e| {
445- self . report_metrics_snapshot ( false ) ;
446- self . status_to_unreachable ( e, endpoint)
447- } ) ?;
448-
449- let raft_res = GrpcHelper :: parse_raft_reply ( resp)
450- . map_err ( |serde_err| new_net_err ( & serde_err, || "parse install_snapshot reply" ) ) ?;
451-
452- self . report_metrics_snapshot ( raft_res. is_ok ( ) ) ;
425+ let res = self . parse_grpc_resp ( grpc_res) ;
426+ self . report_metrics_snapshot ( res. is_ok ( ) ) ;
453427
454- raft_res . map_err ( |e| self . to_rpc_err ( e ) )
428+ res
455429 }
456430
457431 #[ logcall:: logcall( err = "debug" ) ]
@@ -463,7 +437,7 @@ impl RaftNetwork<TypeConfig> for NetworkConnection {
463437 ) -> Result < VoteResponse , RPCError < RaftError > > {
464438 info ! ( id = self . id, target = self . target, rpc = rpc. summary( ) ; "send_vote" ) ;
465439
466- let ( mut client, endpoint ) = self . make_client ( ) . await ?;
440+ let mut client = self . make_client ( ) . await ?;
467441
468442 let raft_req = GrpcHelper :: encode_raft_request ( & rpc) . map_err ( |e| Unreachable :: new ( & e) ) ?;
469443
@@ -475,12 +449,7 @@ impl RaftNetwork<TypeConfig> for NetworkConnection {
475449 let grpc_res = client. vote ( req) . await ;
476450 info ! ( "vote: resp from target={} {:?}" , self . target, grpc_res) ;
477451
478- let resp = grpc_res. map_err ( |e| self . status_to_unreachable ( e, endpoint) ) ?;
479-
480- let raft_res = GrpcHelper :: parse_raft_reply ( resp)
481- . map_err ( |serde_err| new_net_err ( & serde_err, || "parse vote reply" ) ) ?;
482-
483- raft_res. map_err ( |e| self . to_rpc_err ( e) )
452+ self . parse_grpc_resp ( grpc_res)
484453 }
485454
486455 /// When a `Unreachable` error is returned from the `Network`,
@@ -508,10 +477,10 @@ impl RaftNetworkFactory<TypeConfig> for Network {
508477 id : self . sto . id ,
509478 target,
510479 target_node : node. clone ( ) ,
511- install_snapshot_via_v0 : 0 ,
512480 sto : self . sto . clone ( ) ,
513481 conn_pool : self . conn_pool . clone ( ) ,
514482 backoff : self . backoff . clone ( ) ,
483+ endpoint : Default :: default ( ) ,
515484 }
516485 }
517486}
@@ -541,14 +510,3 @@ fn observe_snapshot_send_spent(target: NodeId) -> impl Fn(Duration, Duration) {
541510fn snapshot_send_inflight ( target : NodeId ) -> impl FnMut ( i64 ) {
542511 move |i : i64 | raft_metrics:: network:: incr_snapshot_sendto_inflight ( & target, i)
543512}
544-
545- /// Return true if it IS an error and the error code is Unimplemented.
546- ///
547- /// Return false if it is NOT an error or the error code is NOT Unimplemented.
548- fn is_unimplemented < T > ( res : & Result < T , tonic:: Status > ) -> bool {
549- if let Err ( status) = res {
550- status. code ( ) == tonic:: Code :: Unimplemented
551- } else {
552- false
553- }
554- }
0 commit comments