Skip to content

Commit 80c25ff

Browse files
authored
feat(config): add leader election to configuration, and block mDS communication on being the leader (#1176)
1 parent 3153629 commit 80c25ff

File tree

14 files changed

+413
-61
lines changed

14 files changed

+413
-61
lines changed

Cargo.lock

Lines changed: 244 additions & 37 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ hyper = { version = "1.5", features = ["http2", "http1", "server"] }
101101
hyper-rustls = { version = "0.27", default-features = false, features = [
102102
"http2",
103103
"webpki-roots",
104+
"aws-lc-rs",
104105
] }
105106
ipnetwork = "0.20.0" # keep in sync with maxmind
106107
k8s-openapi.workspace = true
@@ -151,6 +152,8 @@ libflate = "2.1.0"
151152
form_urlencoded = "1.2.1"
152153
enum_dispatch = "0.3.13"
153154
gxhash = "3.4.1"
155+
kube-leader-election = "0.40.0"
156+
rustls = "0.23.25"
154157

155158
[dependencies.hyper-util]
156159
version = "0.1"
@@ -204,12 +207,12 @@ cached = { version = "0.54", default-features = false }
204207
eyre = "0.6.12"
205208
enum-map = "2.7.3"
206209
futures = "0.3.31"
207-
kube = { version = "0.98", features = [
210+
kube = { version = "0.99", features = [
208211
"runtime",
209212
"rustls-tls",
210213
"client",
211214
], default-features = false }
212-
kube-core = { version = "0.98", default-features = false, features = [
215+
kube-core = { version = "0.99", default-features = false, features = [
213216
"schema",
214217
] }
215218
k8s-openapi = { version = "0.24", features = ["v1_29", "schemars"] }

about.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,6 @@ accepted = ["ISC", "OpenSSL"]
3535

3636
[xxhash-rust]
3737
accepted = ["BSL-1.0"]
38+
39+
[aws-lc-sys]
40+
accepted = ["OpenSSL"]

crates/agones/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ futures.workspace = true
3333
k8s-openapi.workspace = true
3434
kube = { workspace = true, features = ["openssl-tls", "client", "derive", "runtime"] }
3535
quilkin = { path = "../../" }
36+
rustls = "0.23.25"
3637
serial_test = "3.2.0"
3738
tokio.workspace = true
3839
tracing.workspace = true

crates/agones/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ impl Client {
9191
/// * Removes previous test namespaces
9292
/// * Retrieves the `IMAGE_TAG` to test from env vars, and panics if it if not available.
9393
pub async fn new() -> Client {
94+
let _provider = rustls::crypto::aws_lc_rs::default_provider().install_default();
9495
let mut client = CLIENT
9596
.get_or_init(|| async {
9697
let client = kube::Client::try_default()

crates/xds/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,5 @@ tracing.workspace = true
4040
tryhard.workspace = true
4141
uuid.workspace = true
4242
url.workspace = true
43+
http = "1.3.1"
44+
tower = { workspace = true, features = ["tokio", "tracing"] }

crates/xds/src/client.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,13 @@ impl MdsClient {
225225
config: Arc<C>,
226226
is_healthy: Arc<AtomicBool>,
227227
) -> Result<DeltaSubscription, Self> {
228+
const LEADERSHIP_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
228229
let identifier = String::from(&*self.identifier);
229230

231+
while config.is_leader() == Some(false) {
232+
tokio::time::sleep(LEADERSHIP_CHECK_INTERVAL).await;
233+
}
234+
230235
let (mut ds, mut stream) =
231236
match DeltaServerStream::connect(self.client.clone(), identifier.clone()).await {
232237
Ok(ds) => {
@@ -245,6 +250,12 @@ impl MdsClient {
245250
tracing::trace!("starting relay client delta stream task");
246251

247252
loop {
253+
if config.is_leader() == Some(false) {
254+
tracing::debug!("not leader, delaying task");
255+
tokio::time::sleep(LEADERSHIP_CHECK_INTERVAL).await;
256+
continue;
257+
}
258+
248259
{
249260
let control_plane = super::server::ControlPlane::from_arc(
250261
config.clone(),
@@ -260,7 +271,16 @@ impl MdsClient {
260271
let mut stream = control_plane.delta_aggregated_resources(stream).await?;
261272
is_healthy.store(true, Ordering::SeqCst);
262273

263-
while let Some(result) = stream.next().await {
274+
loop {
275+
if config.is_leader() == Some(false) {
276+
tracing::warn!("lost leader lock mid-stream, disconnecting");
277+
break;
278+
}
279+
280+
let Some(result) = stream.next().await else {
281+
break;
282+
};
283+
264284
let response = result?;
265285
tracing::trace!("received delta discovery response");
266286
ds.send_response(response).await?;

crates/xds/src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,10 @@ impl ClientTracker {
198198
pub trait Configuration: Send + Sync + Sized + 'static {
199199
fn identifier(&self) -> String;
200200

201+
/// Returns whether the current instance is considered the leader of a set
202+
/// of replicas, if leader election is enabled in a config provider.
203+
fn is_leader(&self) -> Option<bool>;
204+
201205
fn apply_delta(
202206
&self,
203207
resource_type: &str,

deny.toml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ ignore = [
3131
deny = [
3232
{ crate = "openssl-sys", use-instead = "rustls" },
3333
{ crate = "openssl", use-instead = "rustls" },
34-
{ crate = "cmake", use-instead = "cc" },
34+
{ crate = "cmake", use-instead = "cc", wrappers = ["aws-lc-sys"] },
3535
{ crate = "chrono", use-instead = "time", wrappers = [
3636
"k8s-openapi",
3737
"kube-client",
3838
"kube-core",
39+
"kube-leader-election",
3940
] },
4041
]
4142
multiple-versions = "deny"
@@ -46,9 +47,6 @@ skip = [
4647
]
4748
skip-tree = [
4849
{ crate = "[email protected]", reason = "matchers is using an old version, https://github.com/hawkw/matchers/pull/5, but it's also barely maintained..." },
49-
# Much like trust-dns this pulls in a ton of outdated dependencies, but it's _slightly_ better
50-
#{ crate = "[email protected]", reason = "Uses _many_ outdated crates" },
51-
{ crate = "[email protected]", reason = "several crates use this old version" },
5250
{ crate = "[email protected]", reason = "many crates use this old version" },
5351
{ crate = "[email protected]", reason = "several crates use this old version" },
5452
]
@@ -67,6 +65,7 @@ exceptions = [
6765
# This license should not really be used for code, but here we are
6866
{ crate = "notify", allow = ["CC0-1.0"] },
6967
{ crate = "webpki-roots", allow = ["MPL-2.0"] },
68+
{ crate = "aws-lc-sys", allow = ["OpenSSL"] },
7069
]
7170

7271
[[licenses.clarify]]

src/config.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ use std::{
2020
net::{IpAddr, SocketAddr},
2121
sync::{
2222
Arc,
23-
atomic::{AtomicU64, Ordering::Relaxed},
23+
atomic::{AtomicBool, AtomicU64, Ordering::Relaxed},
2424
},
2525
time::Duration,
2626
};
2727

2828
use base64_serde::base64_serde_type;
29+
use once_cell::sync::Lazy;
2930
use schemars::JsonSchema;
3031
use serde::{Deserialize, Serialize};
3132
use uuid::Uuid;
@@ -53,6 +54,21 @@ pub(crate) const BACKOFF_INITIAL_DELAY: Duration = Duration::from_millis(500);
5354

5455
pub type ConfigMap = typemap_rev::TypeMap<dyn typemap_rev::CloneDebuggableStorage>;
5556

57+
#[derive(Debug, Clone, Default)]
58+
#[repr(transparent)]
59+
pub(crate) struct LeaderLock(Arc<Lazy<Arc<AtomicBool>>>);
60+
61+
impl LeaderLock {
62+
pub(crate) fn load(&self) -> bool {
63+
self.0.load(Relaxed)
64+
}
65+
66+
pub(crate) fn store(&self, is_leader: bool) {
67+
crate::metrics::leader_election(is_leader);
68+
self.0.store(is_leader, Relaxed);
69+
}
70+
}
71+
5672
base64_serde_type!(pub Base64Standard, base64::engine::general_purpose::STANDARD);
5773
#[derive(Clone)]
5874
#[cfg_attr(test, derive(Debug))]
@@ -172,6 +188,8 @@ impl<'de> Deserialize<'de> for Config {
172188
});
173189
};
174190

191+
typemap.insert::<LeaderLock>(<_>::default());
192+
175193
Ok(Config {
176194
dyn_cfg: DynamicConfig {
177195
version: version.unwrap_or_default(),
@@ -309,6 +327,7 @@ impl<'de> Deserialize<'de> for DynamicConfig {
309327
});
310328
};
311329

330+
typemap.insert::<LeaderLock>(<_>::default());
312331
Ok(DynamicConfig {
313332
version: version.unwrap_or_default(),
314333
id: id.map_or_else(default_id, |id| Slot::new(Some(id))),
@@ -337,6 +356,10 @@ impl typemap_rev::TypeMapKey for Agent {
337356
type Value = Agent;
338357
}
339358

359+
impl typemap_rev::TypeMapKey for LeaderLock {
360+
type Value = LeaderLock;
361+
}
362+
340363
impl DynamicConfig {
341364
pub fn filters(&self) -> Option<&Slot<FilterChain>> {
342365
self.typemap.get::<FilterChain>()
@@ -353,6 +376,16 @@ impl DynamicConfig {
353376
pub fn agent(&self) -> Option<&Agent> {
354377
self.typemap.get::<Agent>()
355378
}
379+
380+
pub(crate) fn init_leader_lock(&self) -> LeaderLock {
381+
self.typemap.get::<LeaderLock>().unwrap().clone()
382+
}
383+
384+
pub(crate) fn leader_lock(&self) -> Option<&LeaderLock> {
385+
self.typemap
386+
.get::<LeaderLock>()
387+
.filter(|ll| Lazy::get(&*ll.0).is_some())
388+
}
356389
}
357390

358391
#[cfg(test)]
@@ -383,6 +416,10 @@ impl quilkin_xds::config::Configuration for Config {
383416
String::clone(&self.id())
384417
}
385418

419+
fn is_leader(&self) -> Option<bool> {
420+
self.dyn_cfg.leader_lock().map(|ll| ll.load())
421+
}
422+
386423
fn allow_request_processing(&self, resource_type: &str) -> bool {
387424
resource_type.parse::<ResourceType>().is_ok()
388425
}

0 commit comments

Comments
 (0)