Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 17 additions & 19 deletions benches/cluster_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@ mod serde {
fn serialize_to_protobuf(cm: &ClusterMap) -> Vec<Any> {
let mut resources = Vec::new();

for cluster in cm.iter() {
resources.push(
Resource::Cluster(Cluster {
locality: cluster.key().clone().map(From::from),
endpoints: cluster
.endpoints
.iter()
.map(TryFrom::try_from)
.collect::<Result<_, _>>()
.unwrap(),
})
.try_encode()
.unwrap(),
);
for resource in cm.iter_with(|locality, endpoint_set| {
Resource::Cluster(Cluster {
locality: locality.clone().map(From::from),
endpoints: endpoint_set
.endpoints
.iter()
.map(TryFrom::try_from)
.collect::<Result<_, _>>()
.unwrap(),
})
}) {
resources.push(resource.try_encode().unwrap());
}

resources
Expand Down Expand Up @@ -111,11 +109,11 @@ mod ops {
use shared::{GenCluster, gen_cluster_map};

fn compute_hash<const S: u64>(gc: &GenCluster) -> usize {
let mut total_endpoints = 0;

for kv in gc.cm.iter() {
total_endpoints += kv.endpoints.len();
}
let total_endpoints = gc
.cm
.iter_with(|_locality, endpoint_set| endpoint_set.len())
.iter()
.sum();

assert_eq!(total_endpoints, gc.total_endpoints);
total_endpoints
Expand Down
2 changes: 1 addition & 1 deletion benches/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ pub fn gen_cluster_map<const S: u64>(token_kind: TokenKind) -> GenCluster {

// Now actually insert the endpoints, now that the order of keys is established,
// annoying, but note we split out iteration versus insertion, otherwise we deadlock
let keys: Vec<_> = cm.iter().map(|kv| kv.key().clone()).collect();
let keys = cm.iter_with(|locality, _endpoint_set| locality.clone());
let mut sets = std::collections::BTreeMap::new();

let mut token_generator = match token_kind {
Expand Down
20 changes: 12 additions & 8 deletions benches/token_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,22 @@ fn token_router(b: Bencher<'_, '_>, token_kind: &str) {
let filter = TokenRouter::default();
let gc = shared::gen_cluster_map::<42>(token_kind.parse().unwrap());

let mut tokens = Vec::new();

let cm = std::sync::Arc::new(gc.cm);

// Calculate the amount of bytes for all the tokens
for eps in cm.iter() {
for ep in &eps.value().endpoints {
for tok in ep.metadata.known.tokens.iter() {
tokens.push(tok.clone());
let tokens: Vec<Vec<u8>> = cm
.iter_with(|_locality, endpoint_set| {
let mut tokens: Vec<Vec<u8>> = Vec::new();
for ep in &endpoint_set.endpoints {
for tok in ep.metadata.known.tokens.iter() {
tokens.push(tok.clone());
}
}
}
}
tokens
})
.into_iter()
.flatten()
.collect();

let total_token_size: usize = tokens.iter().map(|t| t.len()).sum();
let pool = std::sync::Arc::new(quilkin::collections::BufferPool::new(1, 1));
Expand Down
15 changes: 10 additions & 5 deletions crates/test/tests/mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,17 @@ trace_test!(datacenter_discovery, {
) -> bool {
let dcs = config.dyn_cfg.datacenters().unwrap().read();

for i in dcs.iter() {
dbg!(which, i.key(), i.value());
}
drop(dcs.iter_with(|key, value| {
dbg!(which, key, value);
}));

let entries = dcs
.iter_with(|ip_addr, dc| (ip_addr.clone(), dc.clone()))
.into_iter()
.collect::<std::collections::HashMap<std::net::IpAddr, quilkin::config::Datacenter>>();

let ipv4_dc = dcs.get(&std::net::Ipv4Addr::LOCALHOST.into());
let ipv6_dc = dcs.get(&std::net::Ipv6Addr::LOCALHOST.into());
let ipv4_dc = entries.get(&std::net::Ipv4Addr::LOCALHOST.into());
let ipv6_dc = entries.get(&std::net::Ipv6Addr::LOCALHOST.into());

if counter > 0 {
match (ipv4_dc, ipv6_dc) {
Expand Down
2 changes: 1 addition & 1 deletion crates/xds/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ impl DeltaClientStream {
) -> Result<()> {
crate::metrics::actions_total(KIND_CLIENT, "refresh").inc();
for (rt, names) in subs {
let initial_resource_versions = local.get(rt).clone();
let initial_resource_versions = local.get_versions(rt);
self.req_tx
.send(DeltaDiscoveryRequest {
node: Some(Node {
Expand Down
43 changes: 29 additions & 14 deletions crates/xds/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl LocalVersions {
}

#[inline]
pub fn get(&self, ty: &str) -> parking_lot::MutexGuard<'_, VersionMap> {
fn get_map_guard(&self, ty: &str) -> parking_lot::MutexGuard<'_, VersionMap> {
let g = self
.versions
.iter()
Expand All @@ -53,6 +53,33 @@ impl LocalVersions {
panic!("unable to retrieve `{ty}` versions, available versions are {versions:?}");
}
}

#[inline]
pub fn update_versions(
&self,
type_url: &str,
removed_resources: &Vec<String>,
updated_resources: Vec<(String, String)>,
) {
let mut guard = self.get_map_guard(type_url);

// Remove any resources the upstream server has removed/doesn't have,
// we do this before applying any new/updated resources in case a
// resource is in both lists, though really that would be a bug in
// the upstream server
for removed in removed_resources {
guard.remove(removed);
}

for (k, v) in updated_resources {
guard.insert(k, v);
}
}

#[inline]
pub fn get_versions(&self, type_url: &str) -> VersionMap {
self.get_map_guard(type_url).clone()
}
}

pub struct ClientState {
Expand Down Expand Up @@ -268,19 +295,7 @@ pub fn handle_delta_discovery_responses<C: Configuration>(
let res = config.apply_delta(&type_url, response.resources, &response.removed_resources, remote_addr);

if res.is_ok() {
let mut lock = local.get(&type_url);

// Remove any resources the upstream server has removed/doesn't have,
// we do this before applying any new/updated resources in case a
// resource is in both lists, though really that would be a bug in
// the upstream server
for removed in response.removed_resources {
lock.remove(&removed);
}

for (k, v) in version_map {
lock.insert(k, v);
}
local.update_versions(&type_url, &response.removed_resources, version_map);
}

res
Expand Down
127 changes: 71 additions & 56 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,32 +594,40 @@ impl Config {
}

if let Some(datacenters) = self.dyn_cfg.datacenters() {
for entry in datacenters.read().iter() {
let host = entry.key().to_string();
let qcmp_port = entry.qcmp_port;
let version =
resource_version(entry.icao_code.to_string().as_str(), qcmp_port);

if client_state.version_matches(&host, &version) {
continue;
}
let dc_resource_transformer =
|ip_addr: &std::net::IpAddr,
dc: &Datacenter|
-> eyre::Result<Option<XdsResource>> {
let host = ip_addr.to_string();
let qcmp_port = dc.qcmp_port;
let version =
resource_version(dc.icao_code.to_string().as_str(), qcmp_port);

if client_state.version_matches(&host, &version) {
return Ok(None);
}

let resource = crate::xds::Resource::Datacenter(
crate::net::cluster::proto::Datacenter {
qcmp_port: qcmp_port as _,
icao_code: entry.icao_code.to_string(),
host: host.clone(),
},
);
let resource = crate::xds::Resource::Datacenter(
crate::net::cluster::proto::Datacenter {
qcmp_port: qcmp_port as _,
icao_code: dc.icao_code.to_string(),
host: host.clone(),
},
);

Ok(Some(XdsResource {
name: host,
version,
resource: Some(resource.try_encode()?),
aliases: Vec::new(),
ttl: None,
cache_control: None,
}))
};

resources.push(XdsResource {
name: host,
version,
resource: Some(resource.try_encode()?),
aliases: Vec::new(),
ttl: None,
cache_control: None,
});
for resource in datacenters.read().iter_with(dc_resource_transformer) {
let Some(resource) = resource? else { continue };
resources.push(resource);
}

{
Expand All @@ -628,48 +636,51 @@ impl Config {
let Ok(addr) = key.parse() else {
continue;
};
if dc.get(&addr).is_none() {
if !dc.exists(&addr) {
removed.insert(key.clone());
}
}
}
}
}
ResourceType::Cluster => {
let mut push = |key: &Option<crate::net::endpoint::Locality>,
value: &crate::net::cluster::EndpointSet|
-> crate::Result<()> {
let version = value.version().to_string();
let key_s = key.as_ref().map(|k| k.to_string()).unwrap_or_default();

if client_state.version_matches(&key_s, &version) {
return Ok(());
}

let resource = crate::xds::Resource::Cluster(
quilkin_xds::generated::quilkin::config::v1alpha1::Cluster {
locality: key.clone().map(|l| l.into()),
endpoints: value.endpoints.iter().map(|ep| ep.into()).collect(),
},
);
let cluster_resource_transformer =
|key: &Option<crate::net::endpoint::Locality>,
value: &crate::net::cluster::EndpointSet|
-> crate::Result<Option<XdsResource>> {
let version = value.version().to_string();
let key_s = key.as_ref().map(|k| k.to_string()).unwrap_or_default();

if client_state.version_matches(&key_s, &version) {
return Ok(None);
}

resources.push(XdsResource {
name: key_s,
version,
resource: Some(resource.try_encode()?),
..Default::default()
});
let resource = crate::xds::Resource::Cluster(
quilkin_xds::generated::quilkin::config::v1alpha1::Cluster {
locality: key.clone().map(|l| l.into()),
endpoints: value.endpoints.iter().map(|ep| ep.into()).collect(),
},
);

Ok(())
};
Ok(Some(XdsResource {
name: key_s,
version,
resource: Some(resource.try_encode()?),
..Default::default()
}))
};

let Some(clusters) = self.dyn_cfg.clusters() else {
break 'append;
};

if client_state.subscribed.is_empty() {
for cluster in clusters.read().iter() {
push(cluster.key(), cluster.value())?;
for cluster_resources in
clusters.read().iter_with(cluster_resource_transformer)
{
if let Some(clr) = cluster_resources? {
resources.push(clr);
}
}
} else {
for locality in client_state.subscribed.iter().filter_map(|name| {
Expand All @@ -679,18 +690,22 @@ impl Config {
name.parse().ok().map(Some)
}
}) {
if let Some(cluster) = clusters.read().get(&locality) {
push(cluster.key(), cluster.value())?;
if let Some(cluster_resource) =
clusters.read().with_value(&locality, |entry| {
cluster_resource_transformer(entry.key(), entry.value())
})
{
if let Some(clr) = cluster_resource? {
resources.push(clr);
}
}
}
};

// Currently, we have exactly _one_ special case for removed resources, which
// is when ClusterMap::update_unlocated_endpoints is called to move the None
// locality endpoints to another one, so we just detect that case manually
if client_state.versions.contains_key("")
&& clusters.read().get(&None).is_none()
{
if client_state.versions.contains_key("") && !clusters.read().exists(&None) {
removed.insert("".into());
}
}
Expand Down
23 changes: 17 additions & 6 deletions src/config/datacenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,24 @@ impl DatacenterMap {
}

#[inline]
pub fn get(&self, key: &IpAddr) -> Option<dashmap::mapref::one::Ref<'_, IpAddr, Datacenter>> {
self.map.get(key)
pub fn exists(&self, key: &IpAddr) -> bool {
self.map.get(key).is_some()
}

/// Iterates over the entries in the `DatacenterMap` with the given func
///
/// This ensures that the dashmap entry references are never held across await boundaries as
/// the func cannot be async.
#[inline]
pub fn iter(&self) -> dashmap::iter::Iter<'_, IpAddr, Datacenter> {
self.map.iter()
pub fn iter_with<F, T>(&self, func: F) -> Vec<T>
where
F: for<'a> Fn(&'a IpAddr, &'a Datacenter) -> T,
{
let mut results: Vec<T> = Vec::new();
for entry in self.map.iter() {
results.push(func(entry.key(), entry.value()));
}
results
}

#[inline]
Expand Down Expand Up @@ -113,8 +124,8 @@ impl PartialEq for DatacenterMap {
return false;
}

for a in self.iter() {
match rhs.get(a.key()).filter(|b| *a.value() == **b) {
for a in self.map.iter() {
match rhs.map.get(a.key()).filter(|b| *a.value() == **b) {
Some(_) => {}
None => return false,
}
Expand Down
Loading
Loading