-
Notifications
You must be signed in to change notification settings - Fork 41
basic version of deployment #307
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: ichAB <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements a basic version of the Deployment resource type for the RK8s Kubernetes-compatible orchestration system. It adds the DeploymentController, which manages ReplicaSets based on pod template specifications, handles scaling operations, and maintains deployment status.
Key Changes
- New
Deployment,DeploymentSpec, andDeploymentStatustypes in the common library DeploymentControllerimplementation that reconciles Deployments by creating/managing ReplicaSets- Integration tests covering basic creation, scaling, idempotency, and concurrent reconciliation scenarios
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 22 comments.
Show a summary per file
| File | Description |
|---|---|
project/rks/tests/test-deployment.yaml |
Test fixture YAML file for deployment integration tests |
project/rks/tests/test-deployment.rs |
Comprehensive integration tests for deployment controller functionality |
project/rks/src/main.rs |
Registers DeploymentController with the controller manager |
project/rks/src/controllers/mod.rs |
Exports DeploymentController module |
project/rks/src/controllers/manager.rs |
Adds deployment watch informer with snapshot and reconnect logic |
project/rks/src/controllers/deployment.rs |
Core DeploymentController implementation with reconciliation logic |
project/rks/src/api/xlinestore.rs |
Adds CRUD operations and watch support for deployments in etcd |
project/common/src/lib.rs |
Defines Deployment, DeploymentSpec, and DeploymentStatus data structures |
| // Wait for reconciliation | ||
| println!("Waiting for deployment reconciliation..."); | ||
| sleep(Duration::from_secs(3)).await; | ||
|
|
||
| // Verify ReplicaSet was created | ||
| let replicasets = store.list_replicasets().await?; | ||
| let owned_rs: Vec<_> = replicasets | ||
| .iter() | ||
| .filter(|rs| { | ||
| if let Some(owner_refs) = &rs.metadata.owner_references { | ||
| owner_refs.iter().any(|owner| { | ||
| owner.kind == ResourceKind::Deployment && owner.name == "test-deployment" | ||
| }) | ||
| } else { | ||
| false | ||
| } | ||
| }) | ||
| .collect(); | ||
|
|
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using hardcoded sleep durations makes tests fragile and slower than necessary. Consider implementing a polling mechanism with timeout that checks for the expected condition, which would make tests both faster and more reliable.
| // Wait for reconciliation | |
| println!("Waiting for deployment reconciliation..."); | |
| sleep(Duration::from_secs(3)).await; | |
| // Verify ReplicaSet was created | |
| let replicasets = store.list_replicasets().await?; | |
| let owned_rs: Vec<_> = replicasets | |
| .iter() | |
| .filter(|rs| { | |
| if let Some(owner_refs) = &rs.metadata.owner_references { | |
| owner_refs.iter().any(|owner| { | |
| owner.kind == ResourceKind::Deployment && owner.name == "test-deployment" | |
| }) | |
| } else { | |
| false | |
| } | |
| }) | |
| .collect(); | |
| // Wait for reconciliation by polling for the ReplicaSet to appear | |
| println!("Waiting for deployment reconciliation..."); | |
| let start = std::time::Instant::now(); | |
| let timeout = Duration::from_secs(10); | |
| let poll_interval = Duration::from_millis(200); | |
| let owned_rs = loop { | |
| let replicasets = store.list_replicasets().await?; | |
| let owned_rs: Vec<_> = replicasets | |
| .iter() | |
| .filter(|rs| { | |
| if let Some(owner_refs) = &rs.metadata.owner_references { | |
| owner_refs.iter().any(|owner| { | |
| owner.kind == ResourceKind::Deployment && owner.name == "test-deployment" | |
| }) | |
| } else { | |
| false | |
| } | |
| }) | |
| .collect(); | |
| if !owned_rs.is_empty() { | |
| break owned_rs; | |
| } | |
| if start.elapsed() > timeout { | |
| panic!("Timed out waiting for ReplicaSet to be created for deployment"); | |
| } | |
| sleep(poll_interval).await; | |
| }; |
|
|
||
| fn create_test_deployment(name: &str, replicas: i32) -> Deployment { | ||
| Deployment { | ||
| api_version: "v1".to_string(), |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The apiVersion is set to "v1" but Deployments should use "apps/v1" to match Kubernetes API conventions.
| api_version: "v1".to_string(), | |
| api_version: "apps/v1".to_string(), |
| if let Some(existing_yaml) = self.store.get_replicaset_yaml(&rs_name).await? { | ||
| let existing_rs: ReplicaSet = serde_yaml::from_str(&existing_yaml)?; | ||
|
|
||
| // Check if it's owned by this deployment | ||
| if let Some(owner_refs) = &existing_rs.metadata.owner_references { | ||
| if owner_refs.iter().any(|owner| { | ||
| owner.kind == ResourceKind::Deployment && owner.uid == deployment.metadata.uid | ||
| }) { | ||
| // Already exists and owned by us, check if template matches | ||
| if self.replicaset_matches_deployment(&existing_rs, deployment) { | ||
| info!( | ||
| "ReplicaSet {} already exists for deployment {}", | ||
| rs_name, deploy_name | ||
| ); | ||
| return Ok(existing_rs); | ||
| } else { | ||
| // Hash collision: increment collision_count and retry | ||
| info!( | ||
| "Hash collision detected for {}, incrementing collision_count", | ||
| rs_name | ||
| ); | ||
| self.increment_collision_count(deployment).await?; | ||
| return Err(anyhow!( | ||
| "Hash collision detected, collision_count incremented, will retry on next reconcile" | ||
| )); | ||
| } | ||
| } else { | ||
| // Owned by different deployment - rare hash collision | ||
| info!( | ||
| "ReplicaSet {} exists but owned by different deployment, incrementing collision_count", | ||
| rs_name | ||
| ); | ||
| self.increment_collision_count(deployment).await?; | ||
| return Err(anyhow!( | ||
| "Hash collision with different deployment, collision_count incremented" | ||
| )); | ||
| } | ||
| } | ||
| } |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a potential race condition between checking if a ReplicaSet exists (line 124) and creating it. Two concurrent reconciliations could both pass the check and attempt to create the same ReplicaSet. Consider using a compare-and-swap operation or handling the "already exists" error gracefully.
| /// Get all deployments as a snapshot with the current revision | ||
| pub async fn deployments_snapshot_with_rev(&self) -> Result<(Vec<(String, String)>, i64)> { | ||
| let prefix = "/registry/deployments/"; | ||
| let opts = Some(GetOptions::new().with_prefix()); | ||
| let mut client = self.client.write().await; | ||
| let resp = client.get(prefix, opts).await?; | ||
|
|
||
| let mut items = Vec::new(); | ||
| let rev = resp.header().unwrap().revision(); | ||
|
|
||
| for kv in resp.kvs() { | ||
| let key = String::from_utf8_lossy(kv.key()).replace("/registry/deployments/", ""); | ||
| let yaml = String::from_utf8_lossy(kv.value()).to_string(); | ||
| items.push((key, yaml)); | ||
| } | ||
|
|
||
| Ok((items, rev)) | ||
| } |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing documentation for the public method deployments_snapshot_with_rev. Add a doc comment explaining what it returns and when it should be used.
| use tokio::sync::RwLock; | ||
| use tokio::time::{Duration, sleep}; | ||
|
|
||
| /// Test the basic creation of a Deployment and its reconciliation to create a ReplicaSet and Pods | ||
| #[tokio::test] | ||
| async fn test_deployment_basic_create() -> Result<()> { | ||
| // Connect to xline | ||
| let endpoints = vec![ | ||
| "http://172.20.0.3:2379".to_string(), | ||
| "http://172.20.0.4:2379".to_string(), | ||
| "http://172.20.0.5:2379".to_string(), | ||
| ]; |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hardcoded endpoints create a dependency on specific network infrastructure. Consider using environment variables or a test helper function to make tests more portable and configurable.
| use tokio::sync::RwLock; | |
| use tokio::time::{Duration, sleep}; | |
| /// Test the basic creation of a Deployment and its reconciliation to create a ReplicaSet and Pods | |
| #[tokio::test] | |
| async fn test_deployment_basic_create() -> Result<()> { | |
| // Connect to xline | |
| let endpoints = vec![ | |
| "http://172.20.0.3:2379".to_string(), | |
| "http://172.20.0.4:2379".to_string(), | |
| "http://172.20.0.5:2379".to_string(), | |
| ]; | |
| use std::env; | |
| use tokio::sync::RwLock; | |
| use tokio::time::{Duration, sleep}; | |
| /// Helper to get xline endpoints from env or use default | |
| fn get_xline_endpoints() -> Vec<String> { | |
| if let Ok(val) = env::var("XLINE_ENDPOINTS") { | |
| val.split(',') | |
| .map(|s| s.trim().to_string()) | |
| .filter(|s| !s.is_empty()) | |
| .collect() | |
| } else { | |
| vec![ | |
| "http://172.20.0.3:2379".to_string(), | |
| "http://172.20.0.4:2379".to_string(), | |
| "http://172.20.0.5:2379".to_string(), | |
| ] | |
| } | |
| } | |
| #[tokio::test] | |
| async fn test_deployment_basic_create() -> Result<()> { | |
| // Connect to xline | |
| let endpoints = get_xline_endpoints(); |
|
|
||
| let yaml = self | ||
| .store | ||
| .get_deployment_yaml(deploy_name) | ||
| .await? | ||
| .ok_or_else(|| anyhow!("Deployment {} not found", deploy_name))?; | ||
|
|
||
| let mut deploy: Deployment = serde_yaml::from_str(&yaml)?; | ||
| let new_count = deploy.status.collision_count + 1; | ||
| deploy.status.collision_count = new_count; | ||
|
|
||
| let updated_yaml = serde_yaml::to_string(&deploy)?; | ||
| self.store | ||
| .insert_deployment_yaml(deploy_name, &updated_yaml) | ||
| .await?; | ||
|
|
||
| info!( | ||
| "Incremented collision_count to {} for deployment {}", | ||
| new_count, deploy_name | ||
| ); | ||
| Ok(()) |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a race condition in the read-modify-write pattern. Between reading the deployment (lines 234-238) and writing it back (lines 244-247), another reconciliation could update the deployment, causing lost updates. Consider using optimistic locking or versioning.
| let yaml = self | |
| .store | |
| .get_deployment_yaml(deploy_name) | |
| .await? | |
| .ok_or_else(|| anyhow!("Deployment {} not found", deploy_name))?; | |
| let mut deploy: Deployment = serde_yaml::from_str(&yaml)?; | |
| let new_count = deploy.status.collision_count + 1; | |
| deploy.status.collision_count = new_count; | |
| let updated_yaml = serde_yaml::to_string(&deploy)?; | |
| self.store | |
| .insert_deployment_yaml(deploy_name, &updated_yaml) | |
| .await?; | |
| info!( | |
| "Incremented collision_count to {} for deployment {}", | |
| new_count, deploy_name | |
| ); | |
| Ok(()) | |
| let mut retries = 5; | |
| loop { | |
| let yaml = self | |
| .store | |
| .get_deployment_yaml(deploy_name) | |
| .await? | |
| .ok_or_else(|| anyhow!("Deployment {} not found", deploy_name))?; | |
| let mut deploy: Deployment = serde_yaml::from_str(&yaml)?; | |
| let current_version = deploy.metadata.resource_version.unwrap_or_default(); | |
| let new_count = deploy.status.collision_count + 1; | |
| deploy.status.collision_count = new_count; | |
| // Increment resource_version for optimistic locking | |
| let new_version = current_version | |
| .parse::<u64>() | |
| .unwrap_or(0) | |
| .saturating_add(1) | |
| .to_string(); | |
| deploy.metadata.resource_version = Some(new_version.clone()); | |
| let updated_yaml = serde_yaml::to_string(&deploy)?; | |
| // Attempt compare-and-swap update | |
| let updated = self.store | |
| .insert_deployment_yaml_if_version(deploy_name, &updated_yaml, ¤t_version) | |
| .await?; | |
| if updated { | |
| info!( | |
| "Incremented collision_count to {} for deployment {} (resource_version {})", | |
| new_count, deploy_name, new_version | |
| ); | |
| return Ok(()); | |
| } else { | |
| retries -= 1; | |
| if retries == 0 { | |
| return Err(anyhow!( | |
| "Failed to increment collision_count for deployment {} due to concurrent updates", | |
| deploy_name | |
| )); | |
| } | |
| // Optionally: add a small delay here (tokio::time::sleep) | |
| continue; | |
| } | |
| } |
| let yaml = self | ||
| .store | ||
| .get_deployment_yaml(deploy_name) | ||
| .await? | ||
| .ok_or_else(|| anyhow!("Deployment {} not found", deploy_name))?; | ||
|
|
||
| let mut deploy: Deployment = serde_yaml::from_str(&yaml)?; | ||
| deploy.status.replicas = total_replicas; | ||
| deploy.status.ready_replicas = ready_replicas; | ||
| deploy.status.available_replicas = available_replicas; | ||
| deploy.status.updated_replicas = updated_replicas; | ||
| deploy.status.unavailable_replicas = (deployment.spec.replicas - available_replicas).max(0); | ||
|
|
||
| let updated_yaml = serde_yaml::to_string(&deploy)?; | ||
| self.store | ||
| .insert_deployment_yaml(deploy_name, &updated_yaml) | ||
| .await?; | ||
|
|
||
| info!( | ||
| "Updated status for deployment {}: replicas={}/{}, ready={}, available={}", | ||
| deploy_name, | ||
| total_replicas, | ||
| deployment.spec.replicas, | ||
| ready_replicas, | ||
| available_replicas | ||
| ); | ||
|
|
||
| Ok(()) |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The read-modify-write pattern has the same race condition issue as other status update methods. Between reading the deployment (lines 320-324) and writing it back (lines 333-336), concurrent updates could be lost.
| use std::collections::hash_map::DefaultHasher; | ||
| use std::hash::{Hash, Hasher}; | ||
| use std::sync::Arc; | ||
|
|
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing documentation for the public struct. According to coding guidelines, public APIs should be documented. Add a doc comment explaining what the DeploymentController does and its role in the system.
| /// Controller responsible for reconciling Deployment resources. | |
| /// | |
| /// The `DeploymentController` manages the lifecycle of Deployments in the RK8s control plane, | |
| /// ensuring that the desired state specified by Deployment objects is reflected in the cluster. | |
| /// It handles creation, updates, and deletion of Deployments, and manages associated ReplicaSets | |
| /// to maintain the correct number of pod replicas, similar to the Kubernetes Deployment controller. |
| store: Arc<XlineStore>, | ||
| } | ||
|
|
||
| impl DeploymentController { |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The public constructor new lacks documentation. Add a doc comment describing what it creates and the purpose of the store parameter.
| impl DeploymentController { | |
| impl DeploymentController { | |
| /// Creates a new `DeploymentController` instance. | |
| /// | |
| /// # Arguments | |
| /// | |
| /// * `store` - An `Arc` to the shared `XlineStore` used for managing deployment state. |
| // deployments informer with reconnect loop | ||
| let mgr_deploy = self.clone(); | ||
| let store_deploy = store.clone(); | ||
| tokio::spawn(async move { | ||
| let mut backoff_ms = 100u64; | ||
| loop { | ||
| match store_deploy.deployments_snapshot_with_rev().await { | ||
| Ok((items, rev)) => { | ||
| for (name, _yaml) in items.into_iter() { | ||
| let senders = mgr_deploy | ||
| .get_senders_by_kind(ResourceKind::Deployment) | ||
| .await; | ||
| for sender in senders { | ||
| let _ = sender | ||
| .send(ResourceWatchResponse { | ||
| kind: ResourceKind::Deployment, | ||
| key: name.clone(), | ||
| event: WatchEvent::Add { | ||
| yaml: _yaml.clone(), | ||
| }, | ||
| }) | ||
| .await; | ||
| } | ||
| } | ||
|
|
||
| // Start watch from rev+1 to skip snapshot duplication | ||
| match store_deploy.watch_deployments(rev + 1).await { | ||
| Ok((_watcher, mut stream)) => { | ||
| backoff_ms = 100; | ||
| loop { | ||
| match stream.message().await { | ||
| Ok(Some(resp)) => { | ||
| for ev in resp.events() { | ||
| if let Some(kv) = ev.kv() { | ||
| let key = String::from_utf8_lossy(kv.key()) | ||
| .replace("/registry/deployments/", ""); | ||
| let event_opt = match ev.event_type() { | ||
| etcd_client::EventType::Put => { | ||
| if let Some(prev_kv) = ev.prev_kv() { | ||
| Some(WatchEvent::Update { | ||
| old_yaml: | ||
| String::from_utf8_lossy( | ||
| prev_kv.value(), | ||
| ) | ||
| .to_string(), | ||
| new_yaml: | ||
| String::from_utf8_lossy( | ||
| kv.value(), | ||
| ) | ||
| .to_string(), | ||
| }) | ||
| } else { | ||
| Some(WatchEvent::Add { | ||
| yaml: String::from_utf8_lossy( | ||
| kv.value(), | ||
| ) | ||
| .to_string(), | ||
| }) | ||
| } | ||
| } | ||
| etcd_client::EventType::Delete => { | ||
| if let Some(prev_kv) = ev.prev_kv() { | ||
| Some(WatchEvent::Delete { | ||
| yaml: String::from_utf8_lossy( | ||
| prev_kv.value(), | ||
| ) | ||
| .to_string(), | ||
| }) | ||
| } else { | ||
| log::warn!( | ||
| "watch delete event missing prev_kv for key {}", | ||
| key | ||
| ); | ||
| None | ||
| } | ||
| } | ||
| }; | ||
| let Some(event) = event_opt else { | ||
| continue; | ||
| }; | ||
| let senders = mgr_deploy | ||
| .get_senders_by_kind( | ||
| ResourceKind::Deployment, | ||
| ) | ||
| .await; | ||
| for sender in senders { | ||
| let _ = sender | ||
| .send(ResourceWatchResponse { | ||
| kind: ResourceKind::Deployment, | ||
| key: key.clone(), | ||
| event: event.clone(), | ||
| }) | ||
| .await; | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Ok(None) => { | ||
| log::info!( | ||
| "deployment watch stream closed, will reconnect" | ||
| ); | ||
| break; | ||
| } | ||
| Err(e) => { | ||
| log::error!( | ||
| "deployment watch error: {:?}, will reconnect", | ||
| e | ||
| ); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Err(e) => { | ||
| log::error!("failed to start deployment watch: {:?}", e); | ||
| } | ||
| } | ||
| } | ||
| Err(e) => { | ||
| log::error!("failed to snapshot deployments: {:?}", e); | ||
| } | ||
| } | ||
| sleep(Duration::from_millis(backoff_ms)).await; | ||
| backoff_ms = (backoff_ms * 2).min(30_000); | ||
| } | ||
| }); |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This entire block (lines 932-1057) is nearly identical to the existing watch patterns for other resources in this file. This represents significant code duplication that reduces maintainability. Consider extracting a generic watch_resource helper function that can be parameterized by resource type, prefix, and snapshot/watch methods.
No description provided.