From 80acece3e52f88769e2ec30350af6066bd3c06ba Mon Sep 17 00:00:00 2001 From: Jessie Chatham Spencer Date: Wed, 23 Nov 2022 17:23:33 +0100 Subject: [PATCH] Implement Ephemeral Containers subresource Signed-off-by: Jessie Chatham Spencer --- kube-client/src/api/subresource.rs | 218 +++++++++++++++++++++++++++-- kube-client/src/lib.rs | 148 +++++++++++++++++++- 2 files changed, 349 insertions(+), 17 deletions(-) diff --git a/kube-client/src/api/subresource.rs b/kube-client/src/api/subresource.rs index f1876737b..269a36ba1 100644 --- a/kube-client/src/api/subresource.rs +++ b/kube-client/src/api/subresource.rs @@ -1,6 +1,6 @@ use bytes::Bytes; use futures::Stream; -use serde::de::DeserializeOwned; +use serde::{de::DeserializeOwned, Serialize}; use std::fmt::Debug; use crate::{ @@ -128,6 +128,196 @@ where } } +// ---------------------------------------------------------------------------- +// Ephemeral containers +// ---------------------------------------------------------------------------- + +/// Marker trait for objects that support the ephemeral containers sub resource. +pub trait Ephemeral {} + +impl Ephemeral for k8s_openapi::api::core::v1::Pod {} + +impl Api +where + K: Clone + DeserializeOwned + Ephemeral, +{ + /// Replace the ephemeral containers sub resource entirely. + /// + /// This functions in the same way as [`Api::replace`] except only `.spec.ephemeralcontainers` is replaced, everything else is ignored. + /// + /// Note that ephemeral containers may **not** be changed or removed once attached to a pod. + /// + /// + /// You way want to patch the underlying resource to gain access to the main container process, + /// see the [documentation](https://kubernetes.io/docs/tasks/configure-pod-container/share-process-namespace/) for `sharedProcessNamespace`. + /// + /// See the Kubernetes [documentation](https://kubernetes.io/docs/concepts/workloads/pods/ephemeral-containers/#what-is-an-ephemeral-container) for more details. + /// + /// [`Api::patch_ephemeral_containers`] may be more ergonomic, as you can will avoid having to first fetch the + /// existing subresources with an approriate merge strategy, see the examples for more details. + /// + /// Example of using `replace_ephemeral_containers`: + /// + /// ```no_run + /// # use k8s_openapi::api::core::v1::Pod; + /// # use kube::{Api, Client, api::PostParams}; + /// # async fn wrapper() -> Result<(), Box> { + /// # let client = Client::try_default().await?; + /// let pods: Api = Api::namespaced(client, "apps"); + /// let pp = PostParams::default(); + /// + /// // Get pod object with ephemeral containers. + /// let mut mypod = pods.get_ephemeral_containers("mypod").await?; + /// + /// // If there were existing ephemeral containers, we would have to append + /// // new containers to the list before calling replace_ephemeral_containers. + /// assert_eq!(mypod.spec.as_mut().unwrap().ephemeral_containers, None); + /// + /// // Add an ephemeral container to the pod object. + /// mypod.spec.as_mut().unwrap().ephemeral_containers = Some(serde_json::from_value(serde_json::json!([ + /// { + /// "name": "myephemeralcontainer", + /// "image": "busybox:1.34.1", + /// "command": ["sh", "-c", "sleep 20"], + /// }, + /// ]))?); + /// + /// pods.replace_ephemeral_containers("mypod", &pp, &mypod).await?; + /// + /// # Ok(()) + /// # } + /// ``` + pub async fn replace_ephemeral_containers(&self, name: &str, pp: &PostParams, data: &K) -> Result + where + K: Serialize, + { + let mut req = self + .request + .replace_subresource( + "ephemeralcontainers", + name, + pp, + serde_json::to_vec(data).map_err(Error::SerdeError)?, + ) + .map_err(Error::BuildRequest)?; + req.extensions_mut().insert("replace_ephemeralcontainers"); + self.client.request::(req).await + } + + /// Patch the ephemeral containers sub resource + /// + /// If an invalid patch is provided the method will **not** always return + /// an error and no changes will be made to the target object in + /// the cluster, see the examples below. + /// + /// Any partial object containing the ephemeral containers + /// sub resource is valid as long as the complete structure + /// for the object is present, as shown below. + /// + /// You way want to patch the underlying resource to gain access to the main container process, + /// see the [docs](https://kubernetes.io/docs/tasks/configure-pod-container/share-process-namespace/) for `sharedProcessNamespace`. + /// + /// Ephemeral containers may **not** be changed or removed once attached to a pod. + /// Therefore if the chosen merge strategy overwrites the existing ephemeral containers, + /// you will have to fetch the existing ephemeral containers first. + /// In order to append your new ephemeral containers to the existing list before patching. See some examples and + /// discussion related to merge strategies in Kubernetes + /// [here](https://kubernetes.io/docs/tasks/manage-kubernetes-objects/update-api-object-kubectl-patch/#use-a-json-merge-patch-to-update-a-deployment). The example below uses a strategic merge patch which does not require + /// + /// See the `Kubernetes` [documentation](https://kubernetes.io/docs/concepts/workloads/pods/ephemeral-containers/) + /// for more information about ephemeral containers. + /// + /// **Valid and invalid Examples for patching a pod** + /// + /// Valid as this can be merged with a complete Pod object: + /// ```rust,no_run + /// let patch = serde_json::json!({ + /// "spec":{ + /// "ephemeralContainers": [ + /// { + /// "name": "myephemeralcontainer", + /// "image": "busybox:1.34.1", + /// "command": ["sh", "-c", "sleep 20"], + /// }, + /// ] + /// }}); + /// + /// ``` + /// + /// Not valid as the outer layer of the `Pod` object is missing, instead + /// we have provided a partial `Pod` spec. Again note that no error will + /// be returned, the patch will simply have no effect. + /// ```rust,no_run + /// let patch = serde_json::json!( + /// { + /// "ephemeralContainers": [ + /// { + /// "name": "myephemeralcontainer", + /// "image": "busybox:1.34.1", + /// "command": ["sh", "-c", "sleep 20"], + /// }, + /// ] + /// }); + /// ``` + /// + /// Example of using `patch_ephemeral_containers`: + /// + /// ```rust,no_run + /// # use kube::{api::{Api, PatchParams, Patch}, Client}; + /// # use k8s_openapi::api::core::v1::Pod; + /// # async fn wrapper() -> Result<(), Box> { + /// # let client = Client::try_default().await?; + /// let pods: Api = Api::namespaced(client, "apps"); + /// let pp = PatchParams::default(); // stratetgic merge patch + /// + /// // Note that the strategic merge patch will concatenate the + /// // lists of ephemeral containers so we avoid having to fetch the + /// // current list and append to it manually. + /// let patch = serde_json::json!({ + /// "spec":{ + /// "ephemeralContainers": [ + /// { + /// "name": "myephemeralcontainer", + /// "image": "busybox:1.34.1", + /// "command": ["sh", "-c", "sleep 20"], + /// }, + /// ] + /// }}); + /// + /// pods.patch_ephemeral_containers("mypod", &pp, &Patch::Strategic(patch)).await?; + /// + /// # Ok(()) + /// # } + /// ``` + pub async fn patch_ephemeral_containers( + &self, + name: &str, + pp: &PatchParams, + patch: &Patch

, + ) -> Result { + let mut req = self + .request + .patch_subresource("ephemeralcontainers", name, pp, patch) + .map_err(Error::BuildRequest)?; + + req.extensions_mut().insert("patch_ephemeralcontainers"); + self.client.request::(req).await + } + + /// Get the named resource with the ephemeral containers subresource. + /// + /// This returns the whole K, with metadata and spec. + pub async fn get_ephemeral_containers(&self, name: &str) -> Result { + let mut req = self + .request + .get_subresource("ephemeralcontainers", name) + .map_err(Error::BuildRequest)?; + + req.extensions_mut().insert("get_ephemeralcontainers"); + self.client.request::(req).await + } +} + // ---------------------------------------------------------------------------- // TODO: Replace examples with owned custom resources. Bad practice to write to owned objects @@ -154,11 +344,10 @@ where /// NB: Requires that the resource has a status subresource. /// /// ```no_run - /// use kube::{api::{Api, PatchParams, Patch}, Client}; - /// use k8s_openapi::api::batch::v1::Job; - /// #[tokio::main] - /// async fn main() -> Result<(), Box> { - /// let client = Client::try_default().await?; + /// # use kube::{api::{Api, PatchParams, Patch}, Client}; + /// # use k8s_openapi::api::batch::v1::Job; + /// # async fn wrapper() -> Result<(), Box> { + /// # let client = Client::try_default().await?; /// let jobs: Api = Api::namespaced(client, "apps"); /// let mut j = jobs.get("baz").await?; /// let pp = PatchParams::default(); // json merge patch @@ -169,8 +358,8 @@ where /// }); /// let o = jobs.patch_status("baz", &pp, &Patch::Merge(data)).await?; /// assert_eq!(o.status.unwrap().succeeded, Some(2)); - /// Ok(()) - /// } + /// # Ok(()) + /// # } /// ``` pub async fn patch_status( &self, @@ -192,18 +381,17 @@ where /// You can leave out the `.spec` entirely from the serialized output. /// /// ```no_run - /// use kube::{api::{Api, PostParams}, Client}; - /// use k8s_openapi::api::batch::v1::{Job, JobStatus}; - /// #[tokio::main] - /// async fn main() -> Result<(), Box> { - /// let client = Client::try_default().await?; + /// # use kube::{api::{Api, PostParams}, Client}; + /// # use k8s_openapi::api::batch::v1::Job; + /// # async fn wrapper() -> Result<(), Box> { + /// # let client = Client::try_default().await?; /// let jobs: Api = Api::namespaced(client, "apps"); /// let mut o = jobs.get_status("baz").await?; // retrieve partial object /// o.status = Some(JobStatus::default()); // update the job part /// let pp = PostParams::default(); /// let o = jobs.replace_status("baz", &pp, serde_json::to_vec(&o)?).await?; - /// Ok(()) - /// } + /// # Ok(()) + /// # } /// ``` pub async fn replace_status(&self, name: &str, pp: &PostParams, data: Vec) -> Result { let mut req = self diff --git a/kube-client/src/lib.rs b/kube-client/src/lib.rs index 93cbb8d35..c296312e3 100644 --- a/kube-client/src/lib.rs +++ b/kube-client/src/lib.rs @@ -137,9 +137,9 @@ mod test { Api, Client, Config, ResourceExt, }; use futures::{StreamExt, TryStreamExt}; - use k8s_openapi::api::core::v1::Pod; + use k8s_openapi::api::core::v1::{EphemeralContainer, Pod, PodSpec}; use kube_core::{ - params::{DeleteParams, Patch}, + params::{DeleteParams, Patch, PatchParams, PostParams}, response::StatusSummary, }; use serde_json::json; @@ -611,4 +611,148 @@ mod test { csr.delete(csr_name, &DeleteParams::default()).await?; Ok(()) } + + #[tokio::test] + #[ignore = "needs cluster for ephemeral containers operations"] + async fn can_operate_on_ephemeral_containers() -> Result<(), Box> { + let pod: Pod = serde_json::from_value(serde_json::json!({ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "name": "ephemeral-container-test-pod", + "labels": { "app": "kube-rs-test" }, + }, + "spec": { + "restartPolicy": "Never", + "containers": [{ + "name": "busybox", + "image": "busybox:1.34.1", + "command": ["sh", "-c", "sleep 2"], + }], + } + }))?; + + let pod_name = pod.name_any(); + + let client = Client::try_default().await?; + let pods = Api::::default_namespaced(client); + + // Ephemeral containes can only be applied to a running pod, so one must + // be created before any operations are tested. + match pods.create(&Default::default(), &pod).await { + Ok(o) => assert_eq!(pod.name_unchecked(), o.name_unchecked()), + Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up + Err(e) => return Err(e.into()), // any other case if a failure + } + + let current_ephemeral_containers = pods + .get_ephemeral_containers(&pod.name_any()) + .await? + .spec + .unwrap() + .ephemeral_containers; + + // TODO: current_ephemeral_containers subtly changes types + // + // We expect no ephemeral containers initially and `get_ephemeral_containers` should + // reflect that. + assert_eq!(current_ephemeral_containers, None); + + let mut busybox_eph: EphemeralContainer = serde_json::from_value(json!( + { + "name": "myephemeralcontainer1", + "image": "busybox:1.34.1", + "command": ["sh", "-c", "sleep 2"], + } + ))?; + + // Attempt to replace ephemeral containers. + let first_patch: Pod = serde_json::from_value(json!({ + "metadata": { "name": pod_name }, + "spec":{ + "ephemeralContainers": [ busybox_eph ] + }}))?; + + let current_containers = pods + .replace_ephemeral_containers(&pod_name, &PostParams::default(), &first_patch) + .await? + .spec + .unwrap() + .ephemeral_containers + .expect("could find ephemeral container"); + + let expected_container = &first_patch.spec.unwrap().ephemeral_containers.unwrap()[0]; + + // Note that we can't compare the whole ephemeral containers object, as some fields + // are set by the cluster. We therefore compare the fields specified in the patch. + assert_eq!(current_containers.len(), 1); + assert_eq!(current_containers[0].name, expected_container.name); + assert_eq!(current_containers[0].image, expected_container.image); + assert_eq!(current_containers[0].command, expected_container.command); + + // Attempt to patch ephemeral containers. + + busybox_eph = serde_json::from_value(json!( + { + "name": "myephemeralcontainer2", + "image": "busybox:1.35.0", + "command": ["sh", "-c", "sleep 1"], + } + ))?; + + let second_patch: Pod = serde_json::from_value(json!({ + "spec": { + "ephemeralContainers": [ busybox_eph ] + }}))?; + + let current_containers = pods + .patch_ephemeral_containers( + &pod_name, + &PatchParams::apply("kubers-test"), + &Patch::Apply(second_patch.clone()), + ) + .await? + .spec + .unwrap() + .ephemeral_containers + .expect("could find ephemeral container"); + + let expected_container = &second_patch.spec.unwrap().ephemeral_containers.unwrap()[0]; + + // There should only be 2 ephemeral containers at this point, + // one from each patch + assert_eq!(current_containers.len(), 2); + + let new_container = current_containers + .iter() + .find(|c| c.name == expected_container.name) + .expect("could find myephemeralcontainer2"); + + // Note that we can't compare the whole ephemeral container object, as some fields + // get set in the cluster. We therefore compare the fields specified in the patch. + assert_eq!(new_container.image, expected_container.image); + assert_eq!(new_container.command, expected_container.command); + + // Attempt to get ephemeral containers. + + let expected_containers = current_containers; + + let current_containers = pods + .get_ephemeral_containers(&pod.name_any()) + .await? + .spec + .unwrap() + .ephemeral_containers + .unwrap(); + + assert_eq!(current_containers, expected_containers); + + pods.delete(&pod.name_any(), &DeleteParams::default()) + .await? + .map_left(|pdel| { + assert_eq!(pdel.name_any(), pod.name_any()); + }); + + Ok(()) + } }