Skip to content

Commit 1157fbd

Browse files
feat: Clean up Datacenters which fail to respond to qcmp pings (#1267)
A proxy that is connected to a relay may miss an update where a Datacenter is removed if there is a temporary disruption. This adds a channel where the phoenix subsystem can report "bad nodes" that it has failed to ping for a period, so that they can be cleaned up from the Config. --------- Co-authored-by: XAMPPRocky <[email protected]>
1 parent 5f1e457 commit 1157fbd

File tree

11 files changed

+232
-67
lines changed

11 files changed

+232
-67
lines changed

crates/test/src/lib.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,7 @@ impl Pail {
368368
let config_path = path.clone();
369369

370370
let (shutdown, shutdown_rx) = quilkin::signal::channel();
371+
let pail_token = quilkin::signal::cancellation_token(shutdown_rx.clone());
371372

372373
let providers = quilkin::Providers::default().fs().fs_path(path);
373374
let svc = quilkin::Service::default()
@@ -376,12 +377,13 @@ impl Pail {
376377
.mds()
377378
.mds_port(mds_port);
378379

379-
let config = Arc::new(crate::Config::new(
380+
let config = crate::Config::new_rc(
380381
Some("test-relay".into()),
381382
Default::default(),
382383
&providers,
383384
&svc,
384-
));
385+
pail_token,
386+
);
385387

386388
*config.dyn_cfg.id.lock() = spc.name.into();
387389
let healthy = Arc::new(std::sync::atomic::AtomicBool::new(false));
@@ -452,6 +454,7 @@ impl Pail {
452454
.collect::<Vec<_>>();
453455

454456
let (shutdown, shutdown_rx) = quilkin::signal::channel();
457+
let pail_token = quilkin::signal::cancellation_token(shutdown_rx.clone());
455458

456459
let port = quilkin::net::socket_port(
457460
&quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket"),
@@ -464,12 +467,13 @@ impl Pail {
464467
.fs_path(path)
465468
.grpc_push_endpoints(relay_servers);
466469

467-
let config = Arc::new(crate::Config::new(
470+
let config = crate::Config::new_rc(
468471
Some("test-agent".into()),
469472
apc.icao_code,
470473
&providers,
471474
&svc,
472-
));
475+
pail_token,
476+
);
473477

474478
*config.dyn_cfg.id.lock() = spc.name.into();
475479
let acfg = config.clone();
@@ -539,12 +543,16 @@ impl Pail {
539543
.phoenix_port(phoenix_port)
540544
.termination_timeout(None);
541545

542-
let config = Arc::new(crate::Config::new(
546+
let (shutdown, shutdown_rx) = quilkin::signal::channel();
547+
let pail_token = quilkin::signal::cancellation_token(shutdown_rx.clone());
548+
549+
let config = crate::Config::new_rc(
543550
Some("test-proxy".into()),
544551
Default::default(),
545552
&Default::default(),
546553
&svc,
547-
));
554+
pail_token,
555+
);
548556

549557
if let Some(cfg) = ppc.config {
550558
if !cfg.clusters.is_empty() {
@@ -581,7 +589,6 @@ impl Pail {
581589
*config.dyn_cfg.id.lock() = spc.name.into();
582590

583591
let (rttx, rtrx) = tokio::sync::mpsc::unbounded_channel();
584-
let (shutdown, shutdown_rx) = quilkin::signal::channel();
585592

586593
let healthy = Arc::new(std::sync::atomic::AtomicBool::new(false));
587594
let provider_task = providers.spawn_providers(

src/cli.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -257,18 +257,19 @@ impl Cli {
257257
tracing::debug!(cli = ?self, "config parameters");
258258

259259
let locality = self.locality.locality();
260+
let shutdown_handler = crate::signal::spawn_handler();
261+
let drive_token = crate::signal::cancellation_token(shutdown_handler.shutdown_rx());
260262

261-
let mut config = crate::Config::new(
263+
let config = crate::Config::new_rc(
262264
self.service.id.clone(),
263265
self.locality.icao_code,
264266
&self.providers,
265267
&self.service,
268+
drive_token.child_token(),
266269
);
267270
config.read_config(&self.config, locality.clone())?;
268-
let config = Arc::new(config);
269271

270272
let ready = Arc::<std::sync::atomic::AtomicBool>::default();
271-
let shutdown_handler = crate::signal::spawn_handler();
272273
if self.admin.enabled {
273274
crate::components::admin::server(
274275
config.clone(),

src/config.rs

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,15 @@ impl LeaderLock {
7474
}
7575
}
7676

77+
pub type BadNodeInformer = tokio::sync::mpsc::UnboundedSender<std::net::SocketAddr>;
78+
7779
base64_serde_type!(pub Base64Standard, base64::engine::general_purpose::STANDARD);
7880
#[derive(Clone)]
7981
#[cfg_attr(test, derive(Debug))]
8082
pub struct Config {
8183
pub dyn_cfg: DynamicConfig,
84+
bad_node_informer: Option<BadNodeInformer>,
85+
cancellation_token: Option<tokio_util::sync::CancellationToken>,
8286
}
8387

8488
#[cfg(test)]
@@ -356,7 +360,18 @@ fn resolve_id(id: Option<String>) -> String {
356360
.unwrap_or_else(uuid)
357361
}
358362

363+
impl Drop for Config {
364+
fn drop(&mut self) {
365+
if let Some(token) = &self.cancellation_token {
366+
token.cancel();
367+
}
368+
}
369+
}
370+
359371
impl Config {
372+
/// Creates and initializes a new Config
373+
///
374+
/// This constructor is mainly intended for tests
360375
pub fn new(
361376
id: Option<String>,
362377
icao_code: IcaoCode,
@@ -370,15 +385,84 @@ impl Config {
370385
icao_code: NotifyingIcaoCode::new(icao_code),
371386
typemap: default_typemap(),
372387
},
388+
bad_node_informer: None,
389+
cancellation_token: None,
373390
};
374391
providers.init_config(&mut config);
375392
service.init_config(&mut config);
376393

377394
config
378395
}
379396

397+
/// Creates and initializes a new Arc<Config>
398+
///
399+
/// Spawns a tokio task as a side effect that will be stopped when the cancellation token is
400+
/// cancelled. The token _will_ be cancelled when Config is dropped, so make sure to pass in a
401+
/// child token if the token is intended to live longer than the Config.
402+
pub fn new_rc(
403+
id: Option<String>,
404+
icao_code: IcaoCode,
405+
providers: &crate::Providers,
406+
service: &crate::Service,
407+
cancellation_token: tokio_util::sync::CancellationToken,
408+
) -> Arc<Self> {
409+
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<std::net::SocketAddr>();
410+
411+
let mut config = Self::new(id, icao_code, providers, service);
412+
config.cancellation_token = Some(cancellation_token);
413+
config.bad_node_informer = Some(tx);
414+
415+
let config = Arc::new(config);
416+
config
417+
.spawn_janitor(rx)
418+
.expect("spawn_janitor() from new_rc() should not fail");
419+
420+
config
421+
}
422+
423+
/// Spawns a janitor task to help out with cleaning stale Datacenter entries from the Config
424+
fn spawn_janitor(
425+
self: &Arc<Config>,
426+
mut rx: tokio::sync::mpsc::UnboundedReceiver<std::net::SocketAddr>,
427+
) -> eyre::Result<()> {
428+
let cancellation_token = self
429+
.cancellation_token
430+
.as_ref()
431+
.ok_or_else(|| eyre::eyre!("cancellation token not set"))?
432+
.clone();
433+
434+
// Ensure that we don't keep an owning reference to Arc<Config> so that Drop can occurr
435+
let janitor_config_ref = Arc::downgrade(self);
436+
tokio::spawn(async move {
437+
loop {
438+
tokio::select! {
439+
_ = cancellation_token.cancelled() => {
440+
break;
441+
}
442+
node = rx.recv() => {
443+
if let Some(node) = node {
444+
let ip = node.ip();
445+
if let Some(config) = janitor_config_ref.upgrade() {
446+
if let Some(datacenters) = config.dyn_cfg.datacenters() {
447+
datacenters.modify(|wg| {
448+
tracing::warn!(%ip, "removing datacenter from local state");
449+
wg.remove(ip);
450+
});
451+
}
452+
} else {
453+
break;
454+
}
455+
}
456+
}
457+
}
458+
}
459+
tracing::trace!("Stopping janitor task");
460+
});
461+
Ok(())
462+
}
463+
380464
pub fn read_config(
381-
&mut self,
465+
self: &Arc<Self>,
382466
config_path: &std::path::Path,
383467
locality: Option<crate::net::endpoint::Locality>,
384468
) -> Result<(), eyre::Error> {
@@ -409,6 +493,10 @@ impl Config {
409493
Ok(())
410494
}
411495

496+
pub fn bad_node_informer(&self) -> Option<BadNodeInformer> {
497+
self.bad_node_informer.clone()
498+
}
499+
412500
/// Given a list of subscriptions and the current state of the calling client,
413501
/// construct a response with the current state of our resources that differ
414502
/// from those of the client

0 commit comments

Comments
 (0)