Skip to content

Commit

Permalink
build(deps): bump kubert to v0.23.0 (#13533)
Browse files Browse the repository at this point in the history
kubert v0.23.0 includes updates to several core dependencies, including:

* http v1
* hyper v1
* kube v0.98
* prometheus-client v0.23
* tonic v0.12
* tower v0.5

This change updates the policy controller to use these new versions.

The k8s-gateway-api dependency is temporarily pinned to a git repository
in anticipation of adopting an alternate crate backend.
  • Loading branch information
olix0r authored Feb 19, 2025
1 parent 40d0d39 commit f60fc0c
Show file tree
Hide file tree
Showing 13 changed files with 523 additions and 616 deletions.
984 changes: 440 additions & 544 deletions Cargo.lock

Large diffs are not rendered by default.

21 changes: 12 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,23 @@ members = [
lto = "thin"

[workspace.dependencies]
http = "0.2"
hyper = { version = "0.14" }
k8s-openapi = { version = "0.20", features = ["v1_22"] }
kube = { version = "0.87.1", default-features = false }
kubert = { version = "0.22", default-features = false }
prometheus-client = { version = "0.22", default-features = false }
tonic = { version = "0.10", default-features = false }
http = "1"
hyper = "1"
k8s-openapi = { version = "0.24", features = ["v1_31"] }
kube = { version = "0.98", default-features = false }
kubert = { version = "0.23", default-features = false }
prometheus-client = { version = "0.23", default-features = false }
tonic = { version = "0.12", default-features = false }
tower = { version = "0.5", default-features = false }

[workspace.dependencies.k8s-gateway-api]
version = "0.16"
# TODO(ver): Remove this once we update to a proper generated version of the gateway api bindings.
git = "https://github.com/linkerd/k8s-gateway-api-rs"
features = ["experimental"]

[workspace.dependencies.linkerd2-proxy-api]
version = "0.15"
git = "https://github.com/linkerd/linkerd2-proxy-api"
branch = "main"
features = [
"inbound",
"outbound",
Expand Down
20 changes: 7 additions & 13 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ targets = [
db-path = "~/.cargo/advisory-db"
db-urls = ["https://github.com/rustsec/advisory-db"]
ignore = [
# Pending kube update
# instant is unmaintained, but pulled in via kube
"RUSTSEC-2024-0384",
"RUSTSEC-2024-0388",
]

[licenses]
Expand All @@ -22,6 +21,7 @@ allow = [
"ISC",
"MIT",
"Unicode-3.0",
"Zlib",
]
confidence-threshold = 0.8
exceptions = [
Expand All @@ -30,10 +30,6 @@ exceptions = [
"MIT",
"OpenSSL",
], name = "ring", version = "*" },

{ allow = [
"Zlib",
], name = "adler32" },
]

[[licenses.clarify]]
Expand All @@ -48,12 +44,6 @@ multiple-versions = "deny"
wildcards = "allow"
highlight = "all"
skip = [
# `rustls-pemfile` and `k8s-openapi` depend on versions of `base64` that
# have diverged significantly.
{ name = "base64" },
# `tower-http` (a transitive dep via `kubert`) depends on v2.x of `bitflags`,
# while pretty much the entire rest of the world is still on v1.x
{ name = "bitflags", version = "1.0" },
# https://github.com/hawkw/matchers/pull/4
{ name = "regex-automata", version = "0.1" },
{ name = "regex-syntax", version = "0.6" },
Expand All @@ -66,6 +56,8 @@ skip-tree = [
{ name = "thiserror", version = "1" },
# rand v0.9 is still making its way through the ecosystem
{ name = "rand", version = "0.8" },
# https://github.com/hyperium/tonic/issues/2135
{ name = "tonic", version = "0.12" },
]

[sources]
Expand All @@ -75,4 +67,6 @@ allow-registry = ["https://github.com/rust-lang/crates.io-index"]
allow-git = []

[sources.allow-org]
github = []
github = [
"linkerd",
]
4 changes: 2 additions & 2 deletions policy-controller/grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ async-trait = "0.1"
http = { workspace = true }
drain = "0.1"
futures = { version = "0.3", default-features = false }
hyper = { workspace = true, features = ["http2", "server", "tcp"] }
hyper = { workspace = true, features = ["http2", "server"] }
linkerd-policy-controller-core = { path = "../core" }
maplit = "1"
prost-types = "0.12.6"
prost-types = "0.13"
tokio = { version = "1", features = ["macros"] }
tonic = { workspace = true }
tracing = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion policy-controller/grpc/src/outbound/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub(crate) fn protocol(
}),
http1: Some(outbound::proxy_protocol::Http1 {
routes: routes.clone(),
failure_accrual: accrual.clone(),
failure_accrual: accrual,
}),
http2: Some(outbound::proxy_protocol::Http2 {
routes,
Expand Down
2 changes: 1 addition & 1 deletion policy-controller/k8s/status/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ kubert = { workspace = true, default-features = false, features = [
linkerd-policy-controller-core = { path = "../../core" }
linkerd-policy-controller-k8s-api = { path = "../api" }
parking_lot = "0.12"
prometheus-client = { workspace = true, default-features = false }
prometheus-client = { workspace = true }
serde = "1"
serde_json = "1.0.138"
thiserror = "2"
Expand Down
3 changes: 1 addition & 2 deletions policy-controller/k8s/status/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,7 @@ impl ControllerMetrics {
patch_timeout.clone(),
);

let patch_duration =
Histogram::new([0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0].into_iter());
let patch_duration = Histogram::new([0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]);
prom.register_with_unit(
"patch_duration",
"Histogram of time taken to apply patch operations",
Expand Down
9 changes: 7 additions & 2 deletions policy-controller/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@ rustls-tls = ["kube/rustls-tls"]
[dependencies]
anyhow = "1"
async-trait = "0.1"
bytes = "1"
drain = "0.1"
futures = { version = "0.3", default-features = false }
k8s-openapi = { workspace = true }
hyper = { workspace = true, features = ["http1", "http2", "runtime", "server"] }
http-body-util = "0.1"
hyper = { workspace = true, features = ["http1", "http2", "server"] }
ipnet = { version = "2", default-features = false }
openssl = { version = "0.10.71", optional = true }
parking_lot = "0.12"
prometheus-client = { workspace = true, default-features = false }
prometheus-client = { workspace = true }
serde = "1"
serde_json = "1"
thiserror = "2"
tokio-stream = { version = "0.1", features = ["sync"] }
tower = { workspace = true }
tracing = "0.1"
regex = "1"

Expand Down Expand Up @@ -56,6 +59,8 @@ features = [
"lease",
"prometheus-client",
"runtime",
# TODO(ver): enable runtime diagnostics
#"runtime-diagnostics",
"server",
"rustls-tls",
]
Expand Down
12 changes: 8 additions & 4 deletions policy-controller/runtime/src/admission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use crate::k8s::policy::{
};
use anyhow::{anyhow, bail, ensure, Context, Result};
use futures::future;
use hyper::{body::Buf, http, Body, Request, Response};
use http_body_util::BodyExt;
use hyper::{http, Request, Response};
use k8s_openapi::api::core::v1::{Namespace, ServiceAccount};
use kube::{core::DynamicObject, Resource, ResourceExt};
use linkerd_policy_controller_k8s_api::gateway;
Expand Down Expand Up @@ -46,9 +47,11 @@ trait Validate<T> {
) -> Result<()>;
}

type Body = http_body_util::Full<bytes::Bytes>;

// === impl AdmissionService ===

impl hyper::service::Service<Request<Body>> for Admission {
impl tower::Service<Request<hyper::body::Incoming>> for Admission {
type Response = Response<Body>;
type Error = Error;
type Future = future::BoxFuture<'static, Result<Response<Body>, Error>>;
Expand All @@ -60,7 +63,7 @@ impl hyper::service::Service<Request<Body>> for Admission {
std::task::Poll::Ready(Ok(()))
}

fn call(&mut self, req: Request<Body>) -> Self::Future {
fn call(&mut self, req: Request<hyper::body::Incoming>) -> Self::Future {
trace!(?req);
if req.method() != http::Method::POST || req.uri().path() != "/" {
return Box::pin(future::ok(
Expand All @@ -73,7 +76,8 @@ impl hyper::service::Service<Request<Body>> for Admission {

let admission = self.clone();
Box::pin(async move {
let bytes = hyper::body::aggregate(req.into_body()).await?;
use bytes::Buf;
let bytes = req.into_body().collect().await?.to_bytes();
let review: Review = match serde_json::from_reader(bytes.reader()) {
Ok(review) => review,
Err(error) => {
Expand Down
41 changes: 22 additions & 19 deletions policy-controller/runtime/src/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,26 @@ use tokio::{sync::watch, time};
const LEASE_DURATION: time::Duration = time::Duration::from_secs(30);
const LEASE_NAME: &str = "policy-controller-write";
const RENEW_GRACE_PERIOD: time::Duration = time::Duration::from_secs(1);
const FIELD_MANAGER: &str = "policy-controller";

pub async fn init<T>(
runtime: &kubert::Runtime<T>,
ns: &str,
namespace: &str,
deployment_name: &str,
hostname: &str,
claimant: &str,
) -> Result<watch::Receiver<Arc<kubert::lease::Claim>>> {
let params = kubert::LeaseParams {
name: LEASE_NAME.to_string(),
namespace: namespace.to_string(),
claimant: claimant.to_string(),
lease_duration: LEASE_DURATION,
renew_grace_period: RENEW_GRACE_PERIOD,
field_manager: Some(FIELD_MANAGER.into()),
};

// Fetch the policy-controller deployment so that we can use it as an owner
// reference of the Lease.
let api = k8s::Api::<Deployment>::namespaced(runtime.client(), ns);
let api = k8s::Api::<Deployment>::namespaced(runtime.client(), namespace);
let mut tries = 3;
let deployment = loop {
tries -= 1;
Expand All @@ -42,8 +52,8 @@ pub async fn init<T>(

let patch = kube::api::Patch::Apply(coordv1::Lease {
metadata: ObjectMeta {
name: Some(LEASE_NAME.to_string()),
namespace: Some(ns.to_string()),
name: Some(params.name.clone()),
namespace: Some(params.namespace.clone()),
// Specifying a resource version of "0" means that we will
// only create the Lease if it does not already exist.
resource_version: Some("0".to_string()),
Expand All @@ -54,7 +64,10 @@ pub async fn init<T>(
"linkerd.io/control-plane-component".to_string(),
"destination".to_string(),
),
("linkerd.io/control-plane-ns".to_string(), ns.to_string()),
(
"linkerd.io/control-plane-ns".to_string(),
params.namespace.clone(),
),
]
.into_iter()
.collect(),
Expand All @@ -67,7 +80,7 @@ pub async fn init<T>(
field_manager: Some("policy-controller".to_string()),
..Default::default()
};
let api = k8s::Api::<coordv1::Lease>::namespaced(runtime.client(), ns);
let api = k8s::Api::<coordv1::Lease>::namespaced(runtime.client(), namespace);

// An individual request may timeout or hit a transient error, so we try up to 3 times with a brief pause.
let mut tries = 3;
Expand Down Expand Up @@ -96,16 +109,6 @@ pub async fn init<T>(
time::sleep(time::Duration::from_secs(1)).await;
}

// Create the lease manager used for trying to claim the policy
// controller write lease.
// todo: Do we need to use LeaseManager::field_manager here?
let params = kubert::lease::ClaimParams {
lease_duration: LEASE_DURATION,
renew_grace_period: RENEW_GRACE_PERIOD,
};
let (claims, _task) = kubert::lease::LeaseManager::init(api, LEASE_NAME)
.await?
.spawn(hostname, params)
.await?;
Ok(claims)
let (claim, _task) = runtime.spawn_lease(params).await?;
Ok(claim)
}
4 changes: 4 additions & 0 deletions policy-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ publish = false

[dependencies]
anyhow = "1"
bytes = "1"
http-body-util = "0.1"
hyper = { workspace = true, features = ["client", "http2"] }
hyper-util = { version = "0.1" }
futures = { version = "0.3", default-features = false }
ipnet = "2"
k8s-gateway-api = { workspace = true }
Expand All @@ -22,6 +25,7 @@ serde_json = "1"
schemars = "0.8"
tonic = { workspace = true }
tokio = { version = "1", features = ["macros", "rt"] }
tower = { workspace = true }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

Expand Down
21 changes: 12 additions & 9 deletions policy-test/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! forwarding to connect to a running instance.
use anyhow::Result;
use futures::{future, prelude::*};
pub use linkerd2_proxy_api::*;
use linkerd2_proxy_api::{
inbound::inbound_server_policies_client::InboundServerPoliciesClient,
Expand Down Expand Up @@ -105,7 +106,7 @@ pub struct OutboundPolicyClient {

#[derive(Debug)]
struct GrpcHttp {
tx: hyper::client::conn::SendRequest<tonic::body::BoxBody>,
tx: hyper::client::conn::http2::SendRequest<tonic::body::BoxBody>,
}

async fn get_policy_controller_pod(client: &kube::Client) -> Result<String> {
Expand Down Expand Up @@ -322,19 +323,19 @@ impl GrpcHttp {
where
I: io::AsyncRead + io::AsyncWrite + Unpin + Send + 'static,
{
let (tx, conn) = hyper::client::conn::Builder::new()
.http2_only(true)
.handshake(io)
.await?;
let (tx, conn) =
hyper::client::conn::http2::Builder::new(hyper_util::rt::TokioExecutor::new())
.handshake(hyper_util::rt::TokioIo::new(io))
.await?;
tokio::spawn(conn);
Ok(Self { tx })
}
}

impl hyper::service::Service<hyper::Request<tonic::body::BoxBody>> for GrpcHttp {
type Response = hyper::Response<hyper::Body>;
impl tower::Service<hyper::Request<tonic::body::BoxBody>> for GrpcHttp {
type Response = hyper::Response<hyper::body::Incoming>;
type Error = hyper::Error;
type Future = hyper::client::conn::ResponseFuture;
type Future = future::BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(
&mut self,
Expand All @@ -355,7 +356,9 @@ impl hyper::service::Service<hyper::Request<tonic::body::BoxBody>> for GrpcHttp
);
parts.uri = hyper::Uri::from_parts(uri).unwrap();

self.tx.call(hyper::Request::from_parts(parts, body))
self.tx
.send_request(hyper::Request::from_parts(parts, body))
.boxed()
}
}

Expand Down
16 changes: 6 additions & 10 deletions policy-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ pub async fn create_ready_pod(client: &kube::Client, pod: k8s::Pod) -> k8s::Pod
ip = %pod
.status.as_ref().expect("pod must have a status")
.pod_ips.as_ref().unwrap()[0]
.ip.as_deref().expect("pod ip must be set"),
.ip,
containers = ?pod
.spec.as_ref().expect("pod must have a spec")
.containers.iter().map(|c| &*c.name).collect::<Vec<_>>(),
Expand Down Expand Up @@ -769,15 +769,11 @@ pub async fn await_service_account(client: &kube::Client, ns: &str, name: &str)
.expect("serviceaccounts watch must not fail");
tracing::info!(?ev);
match ev {
kube::runtime::watcher::Event::Restarted(sas) => {
if sas.iter().any(|sa| sa.name_unchecked() == name) {
return;
}
}
kube::runtime::watcher::Event::Applied(sa) => {
if sa.name_unchecked() == name {
return;
}
kube::runtime::watcher::Event::InitApply(sa)
| kube::runtime::watcher::Event::Apply(sa)
if sa.name_unchecked() == name =>
{
return
}
_ => {}
}
Expand Down

0 comments on commit f60fc0c

Please sign in to comment.