diff --git a/project/common/src/lib.rs b/project/common/src/lib.rs index 3a614fbc3..bb8ab36b4 100644 --- a/project/common/src/lib.rs +++ b/project/common/src/lib.rs @@ -1066,3 +1066,45 @@ pub struct Endpoint { #[serde(default)] pub subsets: Vec, } + +/// The field of deployment is almost same with replicaset +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct DeploymentSpec { + #[serde(default = "default_replicas")] + pub replicas: i32, + pub selector: LabelSelector, + pub template: PodTemplateSpec, +} + +#[derive(Debug, Serialize, Deserialize, Clone, Default)] +#[serde(rename_all = "camelCase")] +pub struct DeploymentStatus { + #[serde(default)] + pub replicas: i32, + // Total number of pods using the desired pod template. + #[serde(default)] + pub updated_replicas: i32, + #[serde(default)] + pub ready_replicas: i32, + // Total number of available pods, which is ready for at least minReadySeconds. + #[serde(default)] + pub available_replicas: i32, + #[serde(default)] + pub unavailable_replicas: i32, + // Collision count for hash collision resolution + #[serde(default)] + pub collision_count: i32, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Deployment { + #[serde(rename = "apiVersion")] + pub api_version: String, + #[serde(rename = "kind")] + pub kind: String, + pub metadata: ObjectMeta, + pub spec: DeploymentSpec, + #[serde(default)] + pub status: DeploymentStatus, +} diff --git a/project/rks/src/api/xlinestore.rs b/project/rks/src/api/xlinestore.rs index 3feaa5686..4e5f44394 100644 --- a/project/rks/src/api/xlinestore.rs +++ b/project/rks/src/api/xlinestore.rs @@ -581,13 +581,104 @@ impl XlineStore { let (watcher, stream) = client.watch(key_prefix, Some(opts)).await?; Ok((watcher, stream)) } + /// Get all deployments as a snapshot with the current revision + pub async fn deployments_snapshot_with_rev(&self) -> Result<(Vec<(String, String)>, i64)> { + let prefix = "/registry/deployments/"; + let opts = Some(GetOptions::new().with_prefix()); + let mut client = self.client.write().await; + let resp = client.get(prefix, opts).await?; + + let mut items = Vec::new(); + let rev = resp.header().unwrap().revision(); + + for kv in resp.kvs() { + let key = String::from_utf8_lossy(kv.key()).replace("/registry/deployments/", ""); + let yaml = String::from_utf8_lossy(kv.value()).to_string(); + items.push((key, yaml)); + } + + Ok((items, rev)) + } + + /// Watch deployments starting from a specific revision + pub async fn watch_deployments(&self, start_rev: i64) -> Result<(Watcher, WatchStream)> { + let key_prefix = "/registry/deployments/".to_string(); + let opts = WatchOptions::new() + .with_prefix() + .with_prev_key() + .with_start_revision(start_rev); + let mut client = self.client.write().await; + let (watcher, stream) = client.watch(key_prefix, Some(opts)).await?; + Ok((watcher, stream)) + } + + /// Insert a deployment YAML definition into xline. + pub async fn insert_deployment_yaml(&self, deploy_name: &str, deploy_yaml: &str) -> Result<()> { + let key = format!("/registry/deployments/{deploy_name}"); + let mut client = self.client.write().await; + client + .put(key, deploy_yaml, Some(PutOptions::new())) + .await?; + Ok(()) + } + + /// Get a deployment YAML definition from xline. + pub async fn get_deployment_yaml(&self, deploy_name: &str) -> Result> { + let key = format!("/registry/deployments/{deploy_name}"); + let mut client = self.client.write().await; + let resp = client.get(key, None).await?; + Ok(resp + .kvs() + .first() + .map(|kv| String::from_utf8_lossy(kv.value()).to_string())) + } + + /// Get a deployment object from xline. + pub async fn get_deployment(&self, deploy_name: &str) -> Result> { + if let Some(yaml) = self.get_deployment_yaml(deploy_name).await? { + let deployment: Deployment = serde_yaml::from_str(&yaml)?; + Ok(Some(deployment)) + } else { + Ok(None) + } + } + + /// List all deployments (deserialize values). + pub async fn list_deployments(&self) -> Result> { + let key = "/registry/deployments/".to_string(); + let mut client = self.client.write().await; + let resp = client + .get(key.clone(), Some(GetOptions::new().with_prefix())) + .await?; + + let deployments: Vec = resp + .kvs() + .iter() + .filter_map(|kv| { + let yaml_str = String::from_utf8_lossy(kv.value()); + serde_yaml::from_str::(&yaml_str).ok() + }) + .collect(); + + Ok(deployments) + } + + /// Delete a deployment from xline. + pub async fn delete_deployment(&self, deploy_name: &str) -> Result<()> { + self.delete_object( + ResourceKind::Deployment, + deploy_name, + DeletePropagationPolicy::Background, + ) + .await + } pub async fn get_object_yaml(&self, kind: ResourceKind, name: &str) -> Result> { match kind { ResourceKind::Pod => self.get_pod_yaml(name).await, ResourceKind::Service => self.get_service_yaml(name).await, // TODO - ResourceKind::Deployment => todo!(), + ResourceKind::Deployment => self.get_deployment_yaml(name).await, ResourceKind::ReplicaSet => self.get_replicaset_yaml(name).await, ResourceKind::Endpoint => self.get_endpoint_yaml(name).await, ResourceKind::Unknown => Ok(None), @@ -604,7 +695,7 @@ impl XlineStore { ResourceKind::Pod => self.insert_pod_yaml(name, yaml).await, ResourceKind::Service => self.insert_service_yaml(name, yaml).await, // TODO - ResourceKind::Deployment => todo!(), + ResourceKind::Deployment => self.insert_deployment_yaml(name, yaml).await, ResourceKind::ReplicaSet => self.insert_replicaset_yaml(name, yaml).await, ResourceKind::Endpoint => self.insert_endpoint_yaml(name, yaml).await, ResourceKind::Unknown => Ok(()), diff --git a/project/rks/src/controllers/deployment.rs b/project/rks/src/controllers/deployment.rs new file mode 100644 index 000000000..8b11b7563 --- /dev/null +++ b/project/rks/src/controllers/deployment.rs @@ -0,0 +1,466 @@ +use crate::api::xlinestore::XlineStore; +use crate::controllers::manager::{Controller, ResourceWatchResponse, WatchEvent}; +use anyhow::{Result, anyhow}; +use async_trait::async_trait; +use common::*; +use log::{debug, error, info}; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +pub struct DeploymentController { + store: Arc, +} + +impl DeploymentController { + pub fn new(store: Arc) -> Self { + Self { store } + } + + /// Reconcile a single deployment by name + async fn reconcile_by_name(&self, name: &str) -> Result<()> { + let yaml = self.store.get_deployment_yaml(name).await?; + + if yaml.is_none() { + info!("Deployment {} not found, skipping reconciliation", name); + return Ok(()); + } + + let deployment: Deployment = serde_yaml::from_str(&yaml.unwrap())?; + self.reconcile_deployment(deployment).await + } + + async fn reconcile_deployment(&self, deployment: Deployment) -> Result<()> { + let deploy_name = deployment.metadata.name.clone(); + + // Check if deployment is being deleted + if deployment.metadata.deletion_timestamp.is_some() { + info!("Deployment {} is being deleted", deploy_name); + return self.handle_deletion(&deployment).await; + } + + info!("Reconciling deployment: {}", deploy_name); + + // Get all ReplicaSets owned by this deployment + let all_rs = self.store.list_replicasets().await?; + let owned_rs: Vec = all_rs + .into_iter() + .filter(|rs| self.is_owned_by(&rs.metadata, &deployment.metadata)) + .collect(); + + // Get or create the ReplicaSet for current spec + let current_rs = self + .get_or_create_replicaset(&deployment, &owned_rs) + .await?; + + // Scale the current ReplicaSet to match deployment's desired replicas + self.ensure_replicas(¤t_rs, deployment.spec.replicas) + .await?; + + // Update deployment status + self.update_deployment_status(&deployment).await?; + + info!("Successfully reconciled deployment: {}", deploy_name); + Ok(()) + } + + async fn handle_deletion(&self, deployment: &Deployment) -> Result<()> { + // The garbage collector will handle cascading deletion of owned ReplicaSets + info!( + "Deployment {} deletion is handled by garbage collector", + deployment.metadata.name + ); + // need a duration to wait gc to clean child resource? + Ok(()) + } + /// check if child resource is owned by parent resource + fn is_owned_by(&self, child_meta: &ObjectMeta, parent_meta: &ObjectMeta) -> bool { + if let Some(owner_refs) = &child_meta.owner_references { + owner_refs.iter().any(|owner_ref| { + owner_ref.uid == parent_meta.uid && owner_ref.kind == ResourceKind::Deployment + }) + } else { + false + } + } + + async fn get_or_create_replicaset( + &self, + deployment: &Deployment, + owned_rs: &[ReplicaSet], + ) -> Result { + // Check if a ReplicaSet with the current template already exists + for rs in owned_rs { + if self.replicaset_matches_deployment(rs, deployment) { + info!("Found existing ReplicaSet: {}", rs.metadata.name); + return Ok(rs.clone()); + } + } + + // Create new ReplicaSet + info!( + "Creating new ReplicaSet for deployment: {}", + deployment.metadata.name + ); + let new_rs = self.create_replicaset(deployment).await?; + Ok(new_rs) + } + + fn replicaset_matches_deployment(&self, rs: &ReplicaSet, deployment: &Deployment) -> bool { + // Compare pod template specs using YAML serialization + let rs_yaml = serde_yaml::to_string(&rs.spec.template.spec).unwrap_or_default(); + let deploy_yaml = serde_yaml::to_string(&deployment.spec.template.spec).unwrap_or_default(); + rs_yaml == deploy_yaml + } + + async fn create_replicaset(&self, deployment: &Deployment) -> Result { + let deploy_name = &deployment.metadata.name; + + let collision_count = deployment.status.collision_count; + let template_hash = self.generate_hash(&deployment.spec.template, collision_count); + let rs_name = format!("{}-{}", deploy_name, template_hash); + + // Check if ReplicaSet with this name already exists + if let Some(existing_yaml) = self.store.get_replicaset_yaml(&rs_name).await? { + let existing_rs: ReplicaSet = serde_yaml::from_str(&existing_yaml)?; + + // Check if it's owned by this deployment + if let Some(owner_refs) = &existing_rs.metadata.owner_references { + if owner_refs.iter().any(|owner| { + owner.kind == ResourceKind::Deployment && owner.uid == deployment.metadata.uid + }) { + // Already exists and owned by us, check if template matches + if self.replicaset_matches_deployment(&existing_rs, deployment) { + info!( + "ReplicaSet {} already exists for deployment {}", + rs_name, deploy_name + ); + return Ok(existing_rs); + } else { + // Hash collision: increment collision_count and retry + info!( + "Hash collision detected for {}, incrementing collision_count", + rs_name + ); + self.increment_collision_count(deployment).await?; + return Err(anyhow!( + "Hash collision detected, collision_count incremented, will retry on next reconcile" + )); + } + } else { + // Owned by different deployment - rare hash collision + info!( + "ReplicaSet {} exists but owned by different deployment, incrementing collision_count", + rs_name + ); + self.increment_collision_count(deployment).await?; + return Err(anyhow!( + "Hash collision with different deployment, collision_count incremented" + )); + } + } + } + + let mut rs_metadata = ObjectMeta { + name: rs_name.clone(), + namespace: deployment.metadata.namespace.clone(), + labels: deployment.spec.selector.match_labels.clone(), + ..Default::default() + }; + + // Set owner reference to enable garbage collection + rs_metadata.owner_references = Some(vec![OwnerReference { + api_version: deployment.api_version.clone(), + kind: ResourceKind::Deployment, + name: deploy_name.clone(), + uid: deployment.metadata.uid, + controller: true, + block_owner_deletion: Some(true), + }]); + + // Prepare pod template with merged labels + let mut template = deployment.spec.template.clone(); + for (k, v) in &deployment.spec.selector.match_labels { + template.metadata.labels.insert(k.clone(), v.clone()); + } + + let rs = ReplicaSet { + api_version: "v1".to_string(), + kind: "ReplicaSet".to_string(), + metadata: rs_metadata, + spec: ReplicaSetSpec { + replicas: deployment.spec.replicas, + selector: deployment.spec.selector.clone(), + template, + }, + status: ReplicaSetStatus::default(), + }; + + let rs_yaml = serde_yaml::to_string(&rs)?; + self.store + .insert_replicaset_yaml(&rs_name, &rs_yaml) + .await?; + + info!( + "Created ReplicaSet {} for deployment {}", + rs_name, deploy_name + ); + Ok(rs) + } + + /// Generate a stable hash of the pod template for replicaset name + /// Use collision_count to avoid hash collisions + /// One pod template always maps to one hash + fn generate_hash(&self, template: &PodTemplateSpec, collision_count: i32) -> String { + let template_yaml = serde_yaml::to_string(&template.spec).unwrap_or_default(); + let mut hasher = DefaultHasher::new(); + template_yaml.hash(&mut hasher); + + // Add collision_count to hash if non-zero + if collision_count > 0 { + collision_count.hash(&mut hasher); + } + + let hash = hasher.finish(); + + // Convert to hex and take first 10 chars + format!("{:x}", hash).chars().take(10).collect() + } + + /// Increment collision_count in deployment status for hash collision resolution + async fn increment_collision_count(&self, deployment: &Deployment) -> Result<()> { + let deploy_name = &deployment.metadata.name; + + let yaml = self + .store + .get_deployment_yaml(deploy_name) + .await? + .ok_or_else(|| anyhow!("Deployment {} not found", deploy_name))?; + + let mut deploy: Deployment = serde_yaml::from_str(&yaml)?; + let new_count = deploy.status.collision_count + 1; + deploy.status.collision_count = new_count; + + let updated_yaml = serde_yaml::to_string(&deploy)?; + self.store + .insert_deployment_yaml(deploy_name, &updated_yaml) + .await?; + + info!( + "Incremented collision_count to {} for deployment {}", + new_count, deploy_name + ); + Ok(()) + } + + /// Ensure the ReplicaSet has the desired number of replicas + /// Just scale up/down the replicas count + async fn ensure_replicas(&self, rs: &ReplicaSet, desired_replicas: i32) -> Result<()> { + let rs_name = &rs.metadata.name; + + if rs.spec.replicas == desired_replicas { + info!( + "ReplicaSet {} already has desired replicas: {}", + rs_name, desired_replicas + ); + return Ok(()); + } + + info!( + "Scaling ReplicaSet {} from {} to {} replicas", + rs_name, rs.spec.replicas, desired_replicas + ); + + // Update ReplicaSet replicas + let rs_yaml = self + .store + .get_replicaset_yaml(rs_name) + .await? + .ok_or_else(|| anyhow!("ReplicaSet {} not found", rs_name))?; + + let mut updated_rs: ReplicaSet = serde_yaml::from_str(&rs_yaml)?; + updated_rs.spec.replicas = desired_replicas; + + let updated_yaml = serde_yaml::to_string(&updated_rs)?; + self.store + .insert_replicaset_yaml(rs_name, &updated_yaml) + .await?; + + Ok(()) + } + /// In the end, update the deployment status based on its ReplicaSets + async fn update_deployment_status(&self, deployment: &Deployment) -> Result<()> { + let deploy_name = &deployment.metadata.name; + + // Get all ReplicaSets owned by this deployment + let all_rs = self.store.list_replicasets().await?; + let owned_rs: Vec = all_rs + .into_iter() + .filter(|rs| self.is_owned_by(&rs.metadata, &deployment.metadata)) + .collect(); + + // Calculate status from all owned ReplicaSets + let mut total_replicas = 0; + let mut ready_replicas = 0; + let mut available_replicas = 0; + let mut updated_replicas = 0; + + for rs in &owned_rs { + total_replicas += rs.status.replicas; + ready_replicas += rs.status.ready_replicas; + available_replicas += rs.status.available_replicas; + + // For now, consider all replicas as "updated" since we only have one RS + if self.replicaset_matches_deployment(rs, deployment) { + updated_replicas = rs.status.replicas; + } + } + + // Update deployment status + let yaml = self + .store + .get_deployment_yaml(deploy_name) + .await? + .ok_or_else(|| anyhow!("Deployment {} not found", deploy_name))?; + + let mut deploy: Deployment = serde_yaml::from_str(&yaml)?; + deploy.status.replicas = total_replicas; + deploy.status.ready_replicas = ready_replicas; + deploy.status.available_replicas = available_replicas; + deploy.status.updated_replicas = updated_replicas; + deploy.status.unavailable_replicas = (deployment.spec.replicas - available_replicas).max(0); + + let updated_yaml = serde_yaml::to_string(&deploy)?; + self.store + .insert_deployment_yaml(deploy_name, &updated_yaml) + .await?; + + info!( + "Updated status for deployment {}: replicas={}/{}, ready={}, available={}", + deploy_name, + total_replicas, + deployment.spec.replicas, + ready_replicas, + available_replicas + ); + + Ok(()) + } +} + +#[async_trait] +impl Controller for DeploymentController { + fn name(&self) -> &'static str { + "deployment" + } + + fn watch_resources(&self) -> Vec { + vec![ResourceKind::Deployment, ResourceKind::ReplicaSet] + } + + async fn handle_watch_response(&mut self, response: &ResourceWatchResponse) -> Result<()> { + match response.kind { + ResourceKind::Deployment => { + debug!( + "DeploymentController handling Deployment event: key={}", + response.key + ); + + // Reconcile on Add events or when the spec has changed + let mut should_reconcile = false; + match &response.event { + WatchEvent::Add { .. } => { + should_reconcile = true; + } + WatchEvent::Update { old_yaml, new_yaml } => { + let old_deploy: Deployment = serde_yaml::from_str(old_yaml)?; + let new_deploy: Deployment = serde_yaml::from_str(new_yaml)?; + // Only reconcile if spec changed (ignore status-only updates) + let old_template_yaml = + serde_yaml::to_string(&old_deploy.spec.template.spec) + .unwrap_or_default(); + let new_template_yaml = + serde_yaml::to_string(&new_deploy.spec.template.spec) + .unwrap_or_default(); + if old_deploy.spec.replicas != new_deploy.spec.replicas + || old_template_yaml != new_template_yaml + { + should_reconcile = true; + } + } + WatchEvent::Delete { .. } => {} + } + + if should_reconcile && let Err(e) = self.reconcile_by_name(&response.key).await { + error!("Failed to reconcile deployment {}: {}", response.key, e); + return Err(e); + } + } + ResourceKind::ReplicaSet => { + debug!( + "DeploymentController handling ReplicaSet event: key={}", + response.key + ); + + // When ReplicaSet status changes, update parent Deployment status + match &response.event { + WatchEvent::Update { old_yaml, new_yaml } => { + let old_rs: ReplicaSet = serde_yaml::from_str(old_yaml)?; + let new_rs: ReplicaSet = serde_yaml::from_str(new_yaml)?; + + // Only update if status changed (compare using YAML serialization) + let old_status_yaml = + serde_yaml::to_string(&old_rs.status).unwrap_or_default(); + let new_status_yaml = + serde_yaml::to_string(&new_rs.status).unwrap_or_default(); + if old_status_yaml != new_status_yaml + && let Err(e) = self.update_deployment_for_replicaset(&new_rs).await + { + error!( + "Failed to update deployment for ReplicaSet {}: {}", + response.key, e + ); + } + } + WatchEvent::Add { yaml } => { + let rs: ReplicaSet = serde_yaml::from_str(yaml)?; + if let Err(e) = self.update_deployment_for_replicaset(&rs).await { + error!( + "Failed to update deployment for new ReplicaSet {}: {}", + response.key, e + ); + } + } + _ => {} + } + } + _ => {} + } + Ok(()) + } +} + +impl DeploymentController { + /// Update deployment status when ReplicaSet changes + async fn update_deployment_for_replicaset(&self, rs: &ReplicaSet) -> Result<()> { + // Find the owning Deployment + if let Some(owner_refs) = &rs.metadata.owner_references { + for owner_ref in owner_refs { + if owner_ref.kind == ResourceKind::Deployment && owner_ref.controller { + let deployment_name = &owner_ref.name; + + if let Some(yaml) = self.store.get_deployment_yaml(deployment_name).await? { + let deployment: Deployment = serde_yaml::from_str(&yaml)?; + self.update_deployment_status(&deployment).await?; + debug!( + "Updated status for deployment {} due to ReplicaSet {} change", + deployment_name, rs.metadata.name + ); + } + break; + } + } + } + Ok(()) + } +} diff --git a/project/rks/src/controllers/manager.rs b/project/rks/src/controllers/manager.rs index e90915ebf..32b90b360 100644 --- a/project/rks/src/controllers/manager.rs +++ b/project/rks/src/controllers/manager.rs @@ -929,7 +929,132 @@ impl ControllerManager { backoff_ms = (backoff_ms * 2).min(30_000); } }); + // deployments informer with reconnect loop + let mgr_deploy = self.clone(); + let store_deploy = store.clone(); + tokio::spawn(async move { + let mut backoff_ms = 100u64; + loop { + match store_deploy.deployments_snapshot_with_rev().await { + Ok((items, rev)) => { + for (name, _yaml) in items.into_iter() { + let senders = mgr_deploy + .get_senders_by_kind(ResourceKind::Deployment) + .await; + for sender in senders { + let _ = sender + .send(ResourceWatchResponse { + kind: ResourceKind::Deployment, + key: name.clone(), + event: WatchEvent::Add { + yaml: _yaml.clone(), + }, + }) + .await; + } + } + // Start watch from rev+1 to skip snapshot duplication + match store_deploy.watch_deployments(rev + 1).await { + Ok((_watcher, mut stream)) => { + backoff_ms = 100; + loop { + match stream.message().await { + Ok(Some(resp)) => { + for ev in resp.events() { + if let Some(kv) = ev.kv() { + let key = String::from_utf8_lossy(kv.key()) + .replace("/registry/deployments/", ""); + let event_opt = match ev.event_type() { + etcd_client::EventType::Put => { + if let Some(prev_kv) = ev.prev_kv() { + Some(WatchEvent::Update { + old_yaml: + String::from_utf8_lossy( + prev_kv.value(), + ) + .to_string(), + new_yaml: + String::from_utf8_lossy( + kv.value(), + ) + .to_string(), + }) + } else { + Some(WatchEvent::Add { + yaml: String::from_utf8_lossy( + kv.value(), + ) + .to_string(), + }) + } + } + etcd_client::EventType::Delete => { + if let Some(prev_kv) = ev.prev_kv() { + Some(WatchEvent::Delete { + yaml: String::from_utf8_lossy( + prev_kv.value(), + ) + .to_string(), + }) + } else { + log::warn!( + "watch delete event missing prev_kv for key {}", + key + ); + None + } + } + }; + let Some(event) = event_opt else { + continue; + }; + let senders = mgr_deploy + .get_senders_by_kind( + ResourceKind::Deployment, + ) + .await; + for sender in senders { + let _ = sender + .send(ResourceWatchResponse { + kind: ResourceKind::Deployment, + key: key.clone(), + event: event.clone(), + }) + .await; + } + } + } + } + Ok(None) => { + log::info!( + "deployment watch stream closed, will reconnect" + ); + break; + } + Err(e) => { + log::error!( + "deployment watch error: {:?}, will reconnect", + e + ); + break; + } + } + } + } + Err(e) => { + log::error!("failed to start deployment watch: {:?}", e); + } + } + } + Err(e) => { + log::error!("failed to snapshot deployments: {:?}", e); + } + } + sleep(Duration::from_millis(backoff_ms)).await; + backoff_ms = (backoff_ms * 2).min(30_000); + } + }); Ok(()) } diff --git a/project/rks/src/controllers/mod.rs b/project/rks/src/controllers/mod.rs index 38130c20a..cf55b6021 100644 --- a/project/rks/src/controllers/mod.rs +++ b/project/rks/src/controllers/mod.rs @@ -1,5 +1,6 @@ +pub mod deployment; pub mod replicaset; - +pub use deployment::DeploymentController; pub use replicaset::ReplicaSetController; pub mod manager; diff --git a/project/rks/src/main.rs b/project/rks/src/main.rs index d76e19980..7e50adff1 100644 --- a/project/rks/src/main.rs +++ b/project/rks/src/main.rs @@ -12,7 +12,9 @@ mod vault; use crate::controllers::endpoint_controller::EndpointController; use crate::controllers::garbage_collector::GarbageCollector; -use crate::controllers::{CONTROLLER_MANAGER, ControllerManager, ReplicaSetController}; +use crate::controllers::{ + CONTROLLER_MANAGER, ControllerManager, DeploymentController, ReplicaSetController, +}; use crate::dns::authority::{run_dns_server, setup_dns_nftable}; use crate::network::init; use crate::network::manager::LocalManager; @@ -171,6 +173,7 @@ async fn register_controllers( let gc = GarbageCollector::new(xline_store.clone()); let rs = ReplicaSetController::new(xline_store.clone()); let ep = EndpointController::new(xline_store.clone()); + let deploy = DeploymentController::new(xline_store.clone()); mgr.clone() .register(Arc::new(RwLock::new(gc)), workers) .await?; @@ -180,5 +183,8 @@ async fn register_controllers( mgr.clone() .register(Arc::new(RwLock::new(ep)), workers) .await?; + mgr.clone() + .register(Arc::new(RwLock::new(deploy)), workers) + .await?; Ok(()) } diff --git a/project/rks/tests/test-deployment.rs b/project/rks/tests/test-deployment.rs new file mode 100644 index 000000000..ec2162df9 --- /dev/null +++ b/project/rks/tests/test-deployment.rs @@ -0,0 +1,636 @@ +use anyhow::Result; +use common::*; +use libvault::storage::xline::XlineOptions; +use rks::api::xlinestore::XlineStore; +use rks::controllers::ReplicaSetController; +use rks::controllers::deployment::DeploymentController; +use rks::controllers::garbage_collector::GarbageCollector; +use rks::controllers::manager::ControllerManager; +use std::sync::Arc; +use tokio::sync::RwLock; +use tokio::time::{Duration, sleep}; + +/// Test the basic creation of a Deployment and its reconciliation to create a ReplicaSet and Pods +#[tokio::test] +async fn test_deployment_basic_create() -> Result<()> { + // Connect to xline + let endpoints = vec![ + "http://172.20.0.3:2379".to_string(), + "http://172.20.0.4:2379".to_string(), + "http://172.20.0.5:2379".to_string(), + ]; + let opts = XlineOptions { + endpoints, + config: None, + }; + let store = Arc::new(XlineStore::new(opts).await?); + + // Setup test environment with all controllers + let _manager = setup_test_manager(store.clone()).await?; + + // Create deployment + let deployment = create_test_deployment("test-deployment", 3); + + // Save deployment + let yaml = serde_yaml::to_string(&deployment)?; + store + .insert_deployment_yaml("test-deployment", &yaml) + .await?; + + println!("Created deployment: test-deployment"); + + // Wait for reconciliation + println!("Waiting for deployment reconciliation..."); + sleep(Duration::from_secs(3)).await; + + // Verify ReplicaSet was created + let replicasets = store.list_replicasets().await?; + let owned_rs: Vec<_> = replicasets + .iter() + .filter(|rs| { + if let Some(owner_refs) = &rs.metadata.owner_references { + owner_refs.iter().any(|owner| { + owner.kind == ResourceKind::Deployment && owner.name == "test-deployment" + }) + } else { + false + } + }) + .collect(); + + assert!(!owned_rs.is_empty(), "No ReplicaSet created for deployment"); + println!("ReplicaSet created: {}", owned_rs[0].metadata.name); + + // Verify replicas + assert_eq!( + owned_rs[0].spec.replicas, 3, + "ReplicaSet should have 3 replicas" + ); + println!("ReplicaSet has correct replica count: 3"); + + // Verify deployment status was updated + let updated_deployment = store + .get_deployment("test-deployment") + .await? + .expect("Deployment should exist"); + + println!("Deployment status:"); + println!( + " - Total replicas: {}", + updated_deployment.status.replicas + ); + println!( + " - Updated replicas: {}", + updated_deployment.status.updated_replicas + ); + println!( + " - Ready replicas: {}", + updated_deployment.status.ready_replicas + ); + println!( + " - Available replicas: {}", + updated_deployment.status.available_replicas + ); + match store.delete_deployment("test-deployment").await { + Ok(_) => println!("Deleted deployment"), + Err(e) => eprintln!("Failed to delete deployment: {}", e), + } + + for rs in &owned_rs { + match store.delete_replicaset(&rs.metadata.name).await { + Ok(_) => println!("Deleted ReplicaSet: {}", rs.metadata.name), + Err(e) => eprintln!("Failed to delete ReplicaSet {}: {}", rs.metadata.name, e), + } + } + + // Wait a bit for GC to clean up Pods, then manually clean any remaining + sleep(Duration::from_millis(500)).await; + + // Clean up any remaining pods owned by the ReplicaSets + let all_pods = store.list_pods().await?; + for pod in all_pods { + if let Some(owner_refs) = &pod.metadata.owner_references { + for owner_ref in owner_refs { + if owner_ref.kind == ResourceKind::ReplicaSet + && owned_rs.iter().any(|rs| rs.metadata.uid == owner_ref.uid) + { + match store.delete_pod(&pod.metadata.name).await { + Ok(_) => println!("Deleted Pod: {}", pod.metadata.name), + Err(e) => eprintln!("Failed to delete Pod {}: {}", pod.metadata.name, e), + } + break; + } + } + } + } + + println!("Cleanup completed"); + + Ok(()) +} + +/// Test scaling a Deployment and verifying the ReplicaSet updates accordingly +#[tokio::test] +async fn test_deployment_scale() -> Result<()> { + let endpoints = vec![ + "http://172.20.0.3:2379".to_string(), + "http://172.20.0.4:2379".to_string(), + "http://172.20.0.5:2379".to_string(), + ]; + let opts = XlineOptions { + endpoints, + config: None, + }; + let store = Arc::new(XlineStore::new(opts).await?); + + // Setup test environment with all controllers + let _manager = setup_test_manager(store.clone()).await?; + + // Create deployment with 2 replicas + let mut deployment = create_test_deployment("test-scale-deployment", 2); + let yaml = serde_yaml::to_string(&deployment)?; + store + .insert_deployment_yaml("test-scale-deployment", &yaml) + .await?; + + println!("Created deployment with 2 replicas"); + + // Wait for initial reconciliation + sleep(Duration::from_secs(3)).await; + + // Verify initial state + let replicasets = store.list_replicasets().await?; + let rs = replicasets + .iter() + .find(|rs| { + rs.metadata + .owner_references + .as_ref() + .and_then(|refs| refs.iter().find(|r| r.name == "test-scale-deployment")) + .is_some() + }) + .expect("ReplicaSet should exist"); + + assert_eq!(rs.spec.replicas, 2); + println!("Initial ReplicaSet has 2 replicas"); + + // Scale up to 5 replicas + deployment.spec.replicas = 5; + let yaml = serde_yaml::to_string(&deployment)?; + store + .insert_deployment_yaml("test-scale-deployment", &yaml) + .await?; + println!("Scaled deployment to 5 replicas"); + + // Wait longer for reconciliation to complete + sleep(Duration::from_secs(5)).await; + + // Verify scale up + let rs_scaled = store + .get_replicaset_yaml(&rs.metadata.name) + .await? + .expect("ReplicaSet should exist"); + let rs_scaled: ReplicaSet = serde_yaml::from_str(&rs_scaled)?; + + // Cleanup - delete all resources created by the test + match store.delete_deployment("test-scale-deployment").await { + Ok(_) => println!("Deleted deployment"), + Err(e) => eprintln!("Failed to delete deployment: {}", e), + } + + match store.delete_replicaset(&rs_scaled.metadata.name).await { + Ok(_) => println!("Deleted ReplicaSet: {}", rs_scaled.metadata.name), + Err(e) => eprintln!( + "Failed to delete ReplicaSet {}: {}", + rs_scaled.metadata.name, e + ), + } + + // Wait a bit for GC to clean up Pods, then manually clean any remaining + sleep(Duration::from_millis(500)).await; + + // Clean up any remaining pods owned by the ReplicaSet + let all_pods = store.list_pods().await?; + for pod in all_pods { + if let Some(owner_refs) = &pod.metadata.owner_references { + if owner_refs.iter().any(|owner| { + owner.kind == ResourceKind::ReplicaSet && owner.uid == rs_scaled.metadata.uid + }) { + match store.delete_pod(&pod.metadata.name).await { + Ok(_) => println!("Deleted Pod: {}", pod.metadata.name), + Err(e) => eprintln!("Failed to delete Pod {}: {}", pod.metadata.name, e), + } + } + } + } + + println!("Cleanup completed"); + + // Assert after cleanup + assert_eq!(rs_scaled.spec.replicas, 5); + println!("ReplicaSet scaled to 5 replicas"); + + Ok(()) +} + +/// Test that re-applying the same deployment yaml does not cause unnecessary updates +#[tokio::test] +async fn test_deployment_idempotency() -> Result<()> { + let endpoints = vec![ + "http://172.20.0.3:2379".to_string(), + "http://172.20.0.4:2379".to_string(), + "http://172.20.0.5:2379".to_string(), + ]; + let opts = XlineOptions { + endpoints, + config: None, + }; + let store = Arc::new(XlineStore::new(opts).await?); + + // Setup test environment with all controllers + let _manager = setup_test_manager(store.clone()).await?; + + // Create deployment + let deployment = create_test_deployment("test-idempotency-deployment", 3); + let yaml = serde_yaml::to_string(&deployment)?; + store + .insert_deployment_yaml("test-idempotency-deployment", &yaml) + .await?; + + println!("Created deployment: test-idempotency-deployment"); + + // Wait for initial reconciliation + sleep(Duration::from_secs(3)).await; + + // Verify ReplicaSet was created + let replicasets = store.list_replicasets().await?; + let owned_rs: Vec<_> = replicasets + .iter() + .filter(|rs| { + if let Some(owner_refs) = &rs.metadata.owner_references { + owner_refs.iter().any(|owner| { + owner.kind == ResourceKind::Deployment + && owner.name == "test-idempotency-deployment" + }) + } else { + false + } + }) + .collect(); + + assert_eq!(owned_rs.len(), 1, "Should have exactly one ReplicaSet"); + let rs_name = owned_rs[0].metadata.name.clone(); + let rs_uid = owned_rs[0].metadata.uid; + println!("Initial ReplicaSet created: {}", rs_name); + + // Write the same deployment again + store + .insert_deployment_yaml("test-idempotency-deployment", &yaml) + .await?; + println!("Re-inserted deployment with same content"); + + // Wait for reconciliation + sleep(Duration::from_secs(3)).await; + + // Verify still only one ReplicaSet exists + let replicasets_after = store.list_replicasets().await?; + let owned_rs_after: Vec<_> = replicasets_after + .iter() + .filter(|rs| { + if let Some(owner_refs) = &rs.metadata.owner_references { + owner_refs.iter().any(|owner| { + owner.kind == ResourceKind::Deployment + && owner.name == "test-idempotency-deployment" + }) + } else { + false + } + }) + .collect(); + + assert_eq!( + owned_rs_after.len(), + 1, + "Should still have exactly one ReplicaSet after re-insert" + ); + assert_eq!( + owned_rs_after[0].metadata.name, rs_name, + "ReplicaSet name should not change" + ); + assert_eq!( + owned_rs_after[0].metadata.uid, rs_uid, + "ReplicaSet UID should not change" + ); + assert_eq!( + owned_rs_after[0].spec.replicas, 3, + "ReplicaSet replicas should remain 3" + ); + println!( + "Idempotency verified: same ReplicaSet {} with replicas=3", + rs_name + ); + + // Verify template is unchanged + let rs_yaml_before = serde_yaml::to_string(&owned_rs[0].spec.template.spec)?; + let rs_yaml_after = serde_yaml::to_string(&owned_rs_after[0].spec.template.spec)?; + assert_eq!( + rs_yaml_before, rs_yaml_after, + "ReplicaSet template should not change" + ); + println!("Template unchanged"); + + // Cleanup + match store.delete_deployment("test-idempotency-deployment").await { + Ok(_) => println!("Deleted deployment"), + Err(e) => eprintln!("Failed to delete deployment: {}", e), + } + + match store.delete_replicaset(&rs_name).await { + Ok(_) => println!("Deleted ReplicaSet: {}", rs_name), + Err(e) => eprintln!("Failed to delete ReplicaSet {}: {}", rs_name, e), + } + + sleep(Duration::from_millis(500)).await; + + let all_pods = store.list_pods().await?; + for pod in all_pods { + if let Some(owner_refs) = &pod.metadata.owner_references { + if owner_refs + .iter() + .any(|owner| owner.kind == ResourceKind::ReplicaSet && owner.uid == rs_uid) + { + match store.delete_pod(&pod.metadata.name).await { + Ok(_) => println!("Deleted Pod: {}", pod.metadata.name), + Err(e) => eprintln!("Failed to delete Pod {}: {}", pod.metadata.name, e), + } + } + } + } + + println!("Cleanup completed"); + + Ok(()) +} + +/// Test concurrent reconciliation attempts do not create duplicate ReplicaSets +#[tokio::test] +async fn test_deployment_concurrent_reconciliation() -> Result<()> { + let endpoints = vec![ + "http://172.20.0.3:2379".to_string(), + "http://172.20.0.4:2379".to_string(), + "http://172.20.0.5:2379".to_string(), + ]; + let opts = XlineOptions { + endpoints, + config: None, + }; + let store = Arc::new(XlineStore::new(opts).await?); + + // Setup test environment with all controllers + let _manager = setup_test_manager(store.clone()).await?; + + // Create deployment + let deployment = create_test_deployment("test-concurrent-deployment", 3); + let yaml = serde_yaml::to_string(&deployment)?; + store + .insert_deployment_yaml("test-concurrent-deployment", &yaml) + .await?; + + println!("Created deployment: test-concurrent-deployment"); + + // Wait for initial reconciliation + sleep(Duration::from_secs(3)).await; + + // Verify ReplicaSet was created + let replicasets = store.list_replicasets().await?; + let owned_rs: Vec<_> = replicasets + .iter() + .filter(|rs| { + if let Some(owner_refs) = &rs.metadata.owner_references { + owner_refs.iter().any(|owner| { + owner.kind == ResourceKind::Deployment + && owner.name == "test-concurrent-deployment" + }) + } else { + false + } + }) + .collect(); + + assert_eq!( + owned_rs.len(), + 1, + "Should have exactly one ReplicaSet initially" + ); + let rs_name = owned_rs[0].metadata.name.clone(); + let rs_uid = owned_rs[0].metadata.uid; + println!("Initial ReplicaSet created: {}", rs_name); + + // Simulate concurrent reconciliation by triggering multiple reconcile events + // This tests that the controller's idempotent logic prevents duplicate creation + println!("Simulating concurrent reconciliation attempts..."); + + let store1 = store.clone(); + let store2 = store.clone(); + let store3 = store.clone(); + let yaml1 = yaml.clone(); + let yaml2 = yaml.clone(); + let yaml3 = yaml.clone(); + + // Spawn multiple concurrent writes to trigger reconciliation + let handle1 = tokio::spawn(async move { + store1 + .insert_deployment_yaml("test-concurrent-deployment", &yaml1) + .await + }); + + let handle2 = tokio::spawn(async move { + store2 + .insert_deployment_yaml("test-concurrent-deployment", &yaml2) + .await + }); + + let handle3 = tokio::spawn(async move { + store3 + .insert_deployment_yaml("test-concurrent-deployment", &yaml3) + .await + }); + + // Wait for all concurrent operations to complete + let _ = tokio::try_join!(handle1, handle2, handle3)?; + println!("Concurrent writes completed"); + + // Wait for reconciliation to settle + sleep(Duration::from_secs(5)).await; + + // Verify still only one ReplicaSet exists + let replicasets_after = store.list_replicasets().await?; + let owned_rs_after: Vec<_> = replicasets_after + .iter() + .filter(|rs| { + if let Some(owner_refs) = &rs.metadata.owner_references { + owner_refs.iter().any(|owner| { + owner.kind == ResourceKind::Deployment + && owner.name == "test-concurrent-deployment" + }) + } else { + false + } + }) + .collect(); + + assert_eq!( + owned_rs_after.len(), + 1, + "Should still have exactly one ReplicaSet after concurrent reconciliation" + ); + assert_eq!( + owned_rs_after[0].metadata.name, rs_name, + "ReplicaSet name should not change" + ); + assert_eq!( + owned_rs_after[0].metadata.uid, rs_uid, + "ReplicaSet UID should not change (same instance)" + ); + assert_eq!( + owned_rs_after[0].spec.replicas, 3, + "ReplicaSet replicas should remain 3" + ); + println!( + "Concurrency test passed: only one ReplicaSet {} exists with replicas=3", + rs_name + ); + + // Cleanup + match store.delete_deployment("test-concurrent-deployment").await { + Ok(_) => println!("Deleted deployment"), + Err(e) => eprintln!("Failed to delete deployment: {}", e), + } + + match store.delete_replicaset(&rs_name).await { + Ok(_) => println!("Deleted ReplicaSet: {}", rs_name), + Err(e) => eprintln!("Failed to delete ReplicaSet {}: {}", rs_name, e), + } + + sleep(Duration::from_millis(500)).await; + + let all_pods = store.list_pods().await?; + for pod in all_pods { + if let Some(owner_refs) = &pod.metadata.owner_references { + if owner_refs + .iter() + .any(|owner| owner.kind == ResourceKind::ReplicaSet && owner.uid == rs_uid) + { + match store.delete_pod(&pod.metadata.name).await { + Ok(_) => println!("Deleted Pod: {}", pod.metadata.name), + Err(e) => eprintln!("Failed to delete Pod {}: {}", pod.metadata.name, e), + } + } + } + } + + println!("Cleanup completed"); + + Ok(()) +} + +/// Setup test environment with all necessary controllers +async fn setup_test_manager(store: Arc) -> Result> { + let manager = Arc::new(ControllerManager::new()); + + // Register GarbageCollector for cascading deletion + let gc = GarbageCollector::new(store.clone()); + manager + .clone() + .register(Arc::new(RwLock::new(gc)), 2) + .await?; + + // Register ReplicaSetController to create Pods + let rs_controller = ReplicaSetController::new(store.clone()); + manager + .clone() + .register(Arc::new(RwLock::new(rs_controller)), 2) + .await?; + + // Register DeploymentController + let deploy_controller = DeploymentController::new(store.clone()); + manager + .clone() + .register(Arc::new(RwLock::new(deploy_controller)), 2) + .await?; + + // Start watching in background + let manager_clone = manager.clone(); + let store_clone = store.clone(); + tokio::spawn(async move { + if let Err(e) = manager_clone.start_watch(store_clone).await { + eprintln!("Manager watch error: {}", e); + } + }); + + // Give the manager time to start watching and establish etcd connections + sleep(Duration::from_secs(2)).await; + + Ok(manager) +} + +fn create_test_deployment(name: &str, replicas: i32) -> Deployment { + Deployment { + api_version: "v1".to_string(), + kind: "Deployment".to_string(), + metadata: ObjectMeta { + name: name.to_string(), + namespace: "default".to_string(), + labels: { + let mut labels = std::collections::HashMap::new(); + labels.insert("app".to_string(), name.to_string()); + labels + }, + ..Default::default() + }, + spec: DeploymentSpec { + replicas, + selector: LabelSelector { + match_labels: { + let mut labels = std::collections::HashMap::new(); + labels.insert("app".to_string(), name.to_string()); + labels + }, + match_expressions: Vec::new(), + }, + template: PodTemplateSpec { + metadata: ObjectMeta { + name: "".to_string(), + namespace: "default".to_string(), + uid: uuid::Uuid::new_v4(), + labels: { + let mut labels = std::collections::HashMap::new(); + labels.insert("app".to_string(), name.to_string()); + labels + }, + annotations: std::collections::HashMap::new(), + owner_references: None, + creation_timestamp: None, + deletion_timestamp: None, + finalizers: None, + }, + spec: PodSpec { + node_name: None, + containers: vec![ContainerSpec { + name: "nginx".to_string(), + image: "./test-image".to_string(), + ports: Vec::new(), + args: Vec::new(), + resources: None, + liveness_probe: None, + readiness_probe: None, + startup_probe: None, + }], + init_containers: Vec::new(), + tolerations: Vec::new(), + }, + }, + }, + status: DeploymentStatus::default(), + } +} diff --git a/project/rks/tests/test-deployment.yaml b/project/rks/tests/test-deployment.yaml new file mode 100644 index 000000000..5f19284f8 --- /dev/null +++ b/project/rks/tests/test-deployment.yaml @@ -0,0 +1,32 @@ +apiVersion: v1 +kind: Deployment +metadata: + name: nginx-deployment + namespace: default + labels: + app: nginx +spec: + replicas: 3 + selector: + matchLabels: + app: nginx + tier: frontend + template: + metadata: + labels: + app: nginx + tier: frontend + bundle: ./rk8s/project/test/bundles/pause + spec: + containers: + - name: nginx + image: ./rk8s/project/test/bundles/busybox + args: + - "sleep" + - "3600" + ports: + - containerPort: 80 + resources: + limits: + cpu: "500m" + memory: "512Mi"