Skip to content

Commit 4ba1d36

Browse files
committed
refactor DatacenterMap to never hand out dashmap references
1 parent 303dbd7 commit 4ba1d36

File tree

4 files changed

+64
-40
lines changed

4 files changed

+64
-40
lines changed

crates/test/tests/mesh.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -189,12 +189,17 @@ trace_test!(datacenter_discovery, {
189189
) -> bool {
190190
let dcs = config.dyn_cfg.datacenters().unwrap().read();
191191

192-
for i in dcs.iter() {
193-
dbg!(which, i.key(), i.value());
194-
}
192+
drop(dcs.iter_with(|key, value| {
193+
dbg!(which, key, value);
194+
}));
195+
196+
let entries = dcs
197+
.iter_with(|ip_addr, dc| (ip_addr.clone(), dc.clone()))
198+
.into_iter()
199+
.collect::<std::collections::HashMap<std::net::IpAddr, quilkin::config::Datacenter>>();
195200

196-
let ipv4_dc = dcs.get(&std::net::Ipv4Addr::LOCALHOST.into());
197-
let ipv6_dc = dcs.get(&std::net::Ipv6Addr::LOCALHOST.into());
201+
let ipv4_dc = entries.get(&std::net::Ipv4Addr::LOCALHOST.into());
202+
let ipv6_dc = entries.get(&std::net::Ipv6Addr::LOCALHOST.into());
198203

199204
if counter > 0 {
200205
match (ipv4_dc, ipv6_dc) {

src/config.rs

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -594,32 +594,40 @@ impl Config {
594594
}
595595

596596
if let Some(datacenters) = self.dyn_cfg.datacenters() {
597-
for entry in datacenters.read().iter() {
598-
let host = entry.key().to_string();
599-
let qcmp_port = entry.qcmp_port;
600-
let version =
601-
resource_version(entry.icao_code.to_string().as_str(), qcmp_port);
602-
603-
if client_state.version_matches(&host, &version) {
604-
continue;
605-
}
597+
let dc_resource_transformer =
598+
|ip_addr: &std::net::IpAddr,
599+
dc: &Datacenter|
600+
-> eyre::Result<Option<XdsResource>> {
601+
let host = ip_addr.to_string();
602+
let qcmp_port = dc.qcmp_port;
603+
let version =
604+
resource_version(dc.icao_code.to_string().as_str(), qcmp_port);
605+
606+
if client_state.version_matches(&host, &version) {
607+
return Ok(None);
608+
}
606609

607-
let resource = crate::xds::Resource::Datacenter(
608-
crate::net::cluster::proto::Datacenter {
609-
qcmp_port: qcmp_port as _,
610-
icao_code: entry.icao_code.to_string(),
611-
host: host.clone(),
612-
},
613-
);
610+
let resource = crate::xds::Resource::Datacenter(
611+
crate::net::cluster::proto::Datacenter {
612+
qcmp_port: qcmp_port as _,
613+
icao_code: dc.icao_code.to_string(),
614+
host: host.clone(),
615+
},
616+
);
617+
618+
Ok(Some(XdsResource {
619+
name: host,
620+
version,
621+
resource: Some(resource.try_encode()?),
622+
aliases: Vec::new(),
623+
ttl: None,
624+
cache_control: None,
625+
}))
626+
};
614627

615-
resources.push(XdsResource {
616-
name: host,
617-
version,
618-
resource: Some(resource.try_encode()?),
619-
aliases: Vec::new(),
620-
ttl: None,
621-
cache_control: None,
622-
});
628+
for resource in datacenters.read().iter_with(dc_resource_transformer) {
629+
let Some(resource) = resource? else { continue };
630+
resources.push(resource);
623631
}
624632

625633
{
@@ -628,7 +636,7 @@ impl Config {
628636
let Ok(addr) = key.parse() else {
629637
continue;
630638
};
631-
if dc.get(&addr).is_none() {
639+
if !dc.exists(&addr) {
632640
removed.insert(key.clone());
633641
}
634642
}

src/config/datacenter.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,24 @@ impl DatacenterMap {
3737
}
3838

3939
#[inline]
40-
pub fn get(&self, key: &IpAddr) -> Option<dashmap::mapref::one::Ref<'_, IpAddr, Datacenter>> {
41-
self.map.get(key)
40+
pub fn exists(&self, key: &IpAddr) -> bool {
41+
self.map.get(key).is_some()
4242
}
4343

44+
/// Iterates over the entries in the `DatacenterMap` with the given func
45+
///
46+
/// This ensures that the dashmap entry references are never held across await boundaries as
47+
/// the func cannot be async.
4448
#[inline]
45-
pub fn iter(&self) -> dashmap::iter::Iter<'_, IpAddr, Datacenter> {
46-
self.map.iter()
49+
pub fn iter_with<F, T>(&self, func: F) -> Vec<T>
50+
where
51+
F: for<'a> Fn(&'a IpAddr, &'a Datacenter) -> T,
52+
{
53+
let mut results: Vec<T> = Vec::new();
54+
for entry in self.map.iter() {
55+
results.push(func(entry.key(), entry.value()));
56+
}
57+
results
4758
}
4859

4960
#[inline]
@@ -113,8 +124,8 @@ impl PartialEq for DatacenterMap {
113124
return false;
114125
}
115126

116-
for a in self.iter() {
117-
match rhs.get(a.key()).filter(|b| *a.value() == **b) {
127+
for a in self.map.iter() {
128+
match rhs.map.get(a.key()).filter(|b| *a.value() == **b) {
118129
Some(_) => {}
119130
None => return false,
120131
}

src/net/phoenix.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -388,10 +388,10 @@ impl<M> Phoenix<M> {
388388
self.nodes.remove(&removed);
389389
}
390390

391-
for entry in dcs.iter() {
392-
let addr = (*entry.key(), entry.value().qcmp_port).into();
393-
self.add_node_if_not_exists(addr, entry.value().icao_code);
394-
}
391+
drop(dcs.iter_with(|ip_addr, dc| {
392+
let socket_addr = (*ip_addr, dc.qcmp_port).into();
393+
self.add_node_if_not_exists(socket_addr, dc.icao_code);
394+
}));
395395
}
396396
}
397397

0 commit comments

Comments
 (0)