From 71d589c02a02e501f8b952e80ba62b8828c2e635 Mon Sep 17 00:00:00 2001 From: oiwn Date: Wed, 5 Feb 2025 02:22:33 +0700 Subject: [PATCH 1/7] imroving workers --- capp-queue/src/queue.rs | 1 - capp/Cargo.toml | 3 +- capp/src/manager/worker.rs | 469 +++++++++++++++++++++++++++---------- 3 files changed, 352 insertions(+), 121 deletions(-) diff --git a/capp-queue/src/queue.rs b/capp-queue/src/queue.rs index 5f86283..e227a4e 100644 --- a/capp-queue/src/queue.rs +++ b/capp-queue/src/queue.rs @@ -18,7 +18,6 @@ where async fn pop(&self) -> Result, TaskQueueError>; async fn ack(&self, task_id: &TaskId) -> Result<(), TaskQueueError>; async fn nack(&self, task: &Task) -> Result<(), TaskQueueError>; - // NOTE: probably need to move into different trait async fn set(&self, task: &Task) -> Result<(), TaskQueueError>; } diff --git a/capp/Cargo.toml b/capp/Cargo.toml index 3d8a60a..f3d4532 100644 --- a/capp/Cargo.toml +++ b/capp/Cargo.toml @@ -47,10 +47,9 @@ serde_yaml = "0.9" rustis = { version = "0.13", features = ["tokio-runtime"], optional = true } mongodb = { version = "3", optional = true } sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "json", "uuid", "chrono" ], optional = true } -tracing-futures = "0.2" [dev-dependencies] -capp = { path = ".", features = ["http", "router", "healthcheck", "mongodb", "redis", "postgres"] } +capp = { path = ".", features = ["http", "router", "healthcheck", "mongodb", "redis"] } hyper = { version = "1.5", features = ["server", "http1"] } http-body-util = "0.1" bytes = "1.6" diff --git a/capp/src/manager/worker.rs b/capp/src/manager/worker.rs index 70fe87c..d6496c5 100644 --- a/capp/src/manager/worker.rs +++ b/capp/src/manager/worker.rs @@ -1,6 +1,7 @@ +#![deny(clippy::unwrap_used)] use crate::manager::{Computation, WorkerStats}; use capp_config::Configurable; -use capp_queue::{AbstractTaskQueue, TaskQueue, TaskQueueError}; +use capp_queue::{AbstractTaskQueue, Task, TaskQueue, TaskQueueError}; use derive_builder::Builder; use serde::{de::DeserializeOwned, Serialize}; use std::{sync::Arc, time::Duration}; @@ -8,7 +9,7 @@ use tokio::sync::{ broadcast, mpsc::{self, error::TryRecvError}, }; -use tracing::{debug, error, info, info_span, warn}; +use tracing::{error, info, instrument, warn}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct WorkerId(usize); @@ -22,6 +23,8 @@ pub struct WorkerOptions { pub task_limit: Option, #[builder(default = "std::time::Duration::from_secs(5)")] pub no_task_found_delay: Duration, + #[builder(default = "std::time::Duration::from_secs(60)")] + pub computation_timeout: Duration, } pub enum WorkerCommand { @@ -83,69 +86,135 @@ where // Implement limiting amount of tasks per worker if let Some(limit) = self.options.task_limit { if self.stats.tasks_processed >= limit { - warn!("[{}] task_limit reached: {}", self.worker_id, limit); + warn!("task_limit reached: {}", limit); return Ok(false); } }; let start_time = std::time::Instant::now(); - match self.queue.pop().await { - Ok(mut task) => { - task.set_in_progress(); - let result = { - self.computation - .call( - self.worker_id, - self.ctx.clone(), - self.queue.clone(), + + // Try to get a task, handle QueueEmpty specially + let pop_result = self.queue.pop().await; + let mut task = match pop_result { + Ok(task) => task, + Err(TaskQueueError::QueueEmpty) => { + warn!("No tasks found, waiting..."); + tokio::time::sleep(self.options.no_task_found_delay).await; + return Ok(true); + } + Err(e) => { + error!("Error popping task: {:?}", e); + // For queue errors, wait a bit then try again + tokio::time::sleep(Duration::from_secs(1)).await; + return Ok(true); + } + }; + + // Mark task as in progress + task.set_in_progress(); + if let Err(e) = self.queue.set(&task).await { + error!("Failed to update task status: {:?}", e); + self.handle_error(&mut task, &format!("Queue error: {}", e)) + .await?; + return Ok(true); + } + + // Run computation with timeout + let computation_timeout = self.options.computation_timeout; + let computation_result = tokio::time::timeout( + computation_timeout, + self.computation.call( + self.worker_id, + self.ctx.clone(), + self.queue.clone(), + &mut task, + ), + ) + .await; + + match computation_result { + // Timeout occurred + Err(_elapsed) => { + self.handle_error(&mut task, "Computation timeout").await?; + } + // Computation completed but may have errored + Ok(result) => match result { + Ok(_) => { + // Successful completion + task.set_succeed(); + if let Err(e) = self.queue.set(&task).await { + error!("Failed to update successful task: {:?}", e); + self.handle_error( &mut task, + &format!("Queue error: {}", e), ) - .await - }; - match result { - Ok(_) => { - task.set_succeed(); - self.queue.set(&task).await.unwrap(); - self.queue.ack(&task.task_id).await.unwrap(); - info!( - "[{}] Task {} succeed: {:?}", - self.worker_id, &task.task_id, &task.payload - ); - - // record stats on success - self.stats.record_execution_time(start_time.elapsed()); - self.stats.record_success(); + .await?; + return Ok(true); } - Err(err) => { - task.set_retry(&err.to_string()); - if task.retries < self.options.max_retries { - self.queue.push(&task).await.unwrap(); - error!( - "[{}] Task {} failed, retrying ({}): {:?}", - self.worker_id, &task.task_id, &task.retries, &err - ); - } else { - task.set_dlq("Max retries"); - self.queue.nack(&task).await.unwrap(); - error!( - "[{}] Task {} failed, max reties ({}): {:?}", - self.worker_id, &task.task_id, &task.retries, &err - ); - } - - self.stats.record_execution_time(start_time.elapsed()); - self.stats.record_failure(); + + if let Err(e) = self.queue.ack(&task.task_id).await { + error!("Failed to ack successful task: {:?}", e); + self.handle_error( + &mut task, + &format!("Queue error: {}", e), + ) + .await?; + return Ok(true); } + + info!("Task {} succeed: {:?}", &task.task_id, &task.payload); + self.stats.record_execution_time(start_time.elapsed()); + self.stats.record_success(); + } + Err(err) => { + self.handle_error(&mut task, &err.to_string()).await?; } + }, + } + + Ok(true) + } + + // Helper method to handle errors uniformly + async fn handle_error( + &mut self, + task: &mut Task, + error_msg: &str, + ) -> anyhow::Result<()> { + task.set_retry(error_msg); + self.stats + .record_execution_time(std::time::Instant::now().elapsed()); + self.stats.record_failure(); + + if task.retries < self.options.max_retries { + if let Err(e) = self.queue.push(task).await { + error!("Failed to push task for retry: {:?}", e); + // If we can't push for retry, try to send to DLQ + task.set_dlq(&format!("Failed to retry: {}", e)); + if let Err(e) = self.queue.nack(task).await { + error!("Failed to nack task after retry failure: {:?}", e); + } + } else { + error!( + "Task {} failed, retrying ({}/{}): {}", + &task.task_id, + &task.retries, + self.options.max_retries, + error_msg + ); } - Err(TaskQueueError::QueueEmpty) => { - warn!("[{}] No tasks found, waiting...", self.worker_id); - // wait for a while till try to fetch task - tokio::time::sleep(self.options.no_task_found_delay).await; + } else { + task.set_dlq("Max retries exceeded"); + if let Err(e) = self.queue.nack(task).await { + error!("Failed to nack task: {:?}", e); } - Err(_err) => {} - }; - Ok(true) + error!( + "Task {} failed, max retries ({}) exceeded: {}", + &task.task_id, self.options.max_retries, error_msg + ); + } + + Ok(()) } } @@ -178,6 +247,7 @@ impl std::fmt::Display for WorkerId { /// This wrapper used to create new Worker setup internal logging /// and handle comminications with worker +#[instrument(fields(worker_id = %worker_id), skip(ctx, storage, computation, commands, terminate, worker_options))] pub async fn worker_wrapper( worker_id: WorkerId, ctx: Arc, @@ -242,81 +312,244 @@ pub async fn worker_wrapper( info!("completed"); } -/// This wrapper used to create new Worker setup internal logging -/// and handle comminications with worker -pub async fn worker_wrapper_old( - worker_id: WorkerId, - ctx: Arc, - storage: Arc + Send + Sync>, - computation: Arc, - mut commands: mpsc::Receiver, - mut terminate: broadcast::Receiver<()>, - worker_options: WorkerOptions, -) where - Data: std::fmt::Debug - + Clone - + Serialize - + DeserializeOwned - + Send - + Sync - + 'static, - Comp: Computation + Send + Sync + 'static, - Ctx: Configurable + Send + Sync + 'static, -{ - let mut worker = Worker::new( - worker_id, - ctx.clone(), - storage.clone(), - computation.clone(), - worker_options, - ); - let mut should_stop = false; +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + use crate::manager::ComputationError; + use async_trait::async_trait; + use capp_queue::{InMemoryTaskQueue, JsonSerializer}; + use serde::{Deserialize, Serialize}; + use std::time::Duration; - // setup spans - let span = info_span!("worker", _id = %worker_id); - let _enter = span.enter(); + // Test data structures + #[derive(Debug, Clone, Serialize, Deserialize)] + struct TestData { + value: u32, + should_timeout: bool, + should_fail: bool, + } - 'worker: loop { - tokio::select! { - _ = terminate.recv() => { - tracing::info!("Terminating immediately"); - return; - }, - run_result = worker.run(), if !should_stop => { - match commands.try_recv() { - Ok(WorkerCommand::Shutdown) => { - tracing::warn!("Shutdown received"); - should_stop = true; - } - Err(TryRecvError::Disconnected) => break 'worker, - _ => {} + struct TestContext { + config: serde_yaml::Value, + } - } - // If worker ask to shutdown for some reason - // i.e some amount of tasks finished - if let Ok(re) = run_result { - if !re { - return; - } - } + impl Configurable for TestContext { + fn config(&self) -> &serde_yaml::Value { + &self.config + } + } + + struct TestComputation { + execution_time: Duration, + } + + #[async_trait] + impl Computation for TestComputation { + async fn call( + &self, + _worker_id: WorkerId, + _ctx: Arc, + _queue: AbstractTaskQueue, + task: &mut Task, + ) -> Result<(), ComputationError> { + // Simulate processing time + tokio::time::sleep(self.execution_time).await; + + if task.payload.should_timeout { + // This will be caught by the timeout wrapper + tokio::time::sleep(Duration::from_secs(60)).await; } - }; - // If a stop command was received, finish any ongoing work and then exit. - if should_stop { - tracing::info!("Completing current task before stopping.",); - break; + if task.payload.should_fail { + return Err(ComputationError::Function( + "Task failed as requested".into(), + )); + } + + Ok(()) } } - tracing::info!("completed"); -} + fn setup_test_worker() -> ( + Worker, + Arc>, + ) { + let ctx = Arc::new(TestContext { + config: serde_yaml::Value::Mapping(serde_yaml::Mapping::new()), + }); + let queue = Arc::new(InMemoryTaskQueue::::new()); + let computation = Arc::new(TestComputation { + execution_time: Duration::from_millis(50), + }); + let options = WorkerOptionsBuilder::default() + .max_retries(3u32) + .computation_timeout(Duration::from_millis(100)) + .build() + .unwrap(); -#[cfg(test)] -mod tests { - use std::assert_eq; + let worker = + Worker::new(WorkerId::new(1), ctx, queue.clone(), computation, options); - use super::*; + (worker, queue) + } + + #[tokio::test] + async fn test_successful_task_processing() { + let (mut worker, queue) = setup_test_worker(); + + // Push a task that should succeed + let task = Task::new(TestData { + value: 42, + should_timeout: false, + should_fail: false, + }); + queue.push(&task).await.unwrap(); + + // Run the worker + let result = worker.run().await; + assert!(result.is_ok()); + + // Verify stats + assert_eq!(worker.stats.tasks_succeeded, 1); + assert_eq!(worker.stats.tasks_failed, 0); + } + + #[tokio::test] + async fn test_task_retry_on_failure() { + let (mut worker, queue) = setup_test_worker(); + + // Push a task that should fail + let task = Task::new(TestData { + value: 42, + should_timeout: false, + should_fail: true, + }); + let task_id = task.task_id; + queue.push(&task).await.unwrap(); + + // Run the worker + let result = worker.run().await; + assert!(result.is_ok()); + + // Verify stats + assert_eq!(worker.stats.tasks_succeeded, 0); + assert_eq!(worker.stats.tasks_failed, 1); + + // Verify task was pushed back for retry + let retried_task = queue.pop().await.unwrap(); + assert_eq!(retried_task.task_id, task_id); + assert_eq!(retried_task.retries, 1); + } + + #[tokio::test] + async fn test_task_timeout() { + let (mut worker, queue) = setup_test_worker(); + + // Push a task that should timeout + let task = Task::new(TestData { + value: 42, + should_timeout: true, + should_fail: false, + }); + let task_id = task.task_id; + queue.push(&task).await.unwrap(); + + // Run the worker + let result = worker.run().await; + assert!(result.is_ok()); + + // Verify stats + assert_eq!(worker.stats.tasks_succeeded, 0); + assert_eq!(worker.stats.tasks_failed, 1); + + // Verify task was pushed back for retry + let retried_task = queue.pop().await.unwrap(); + assert_eq!(retried_task.task_id, task_id); + assert_eq!(retried_task.retries, 1); + } + + #[tokio::test] + async fn test_max_retries_exceeded() { + let (mut worker, queue) = setup_test_worker(); + + // Push a task that will fail + let task = Task::new(TestData { + value: 42, + should_timeout: false, + should_fail: true, + }); + // let task_id = task.task_id; + queue.push(&task).await.unwrap(); + + // Run worker multiple times to exceed max retries + for _ in 0..4 { + // Original try + 3 retries + let result = worker.run().await; + assert!(result.is_ok()); + } + + // Verify stats + assert_eq!(worker.stats.tasks_succeeded, 0); + assert_eq!(worker.stats.tasks_failed, 3); // 3 retries + + // Verify task is not in main queue + assert!(matches!(queue.pop().await, Err(TaskQueueError::QueueEmpty))); + + // Verify task was moved to DLQ (note: would need to add DLQ check functionality) + } + + #[tokio::test] + async fn test_task_limit() { + let ctx = Arc::new(TestContext { + config: serde_yaml::Value::Mapping(serde_yaml::Mapping::new()), + }); + let queue = Arc::new(InMemoryTaskQueue::::new()); + let computation = Arc::new(TestComputation { + execution_time: Duration::from_millis(50), + }); + let options = WorkerOptionsBuilder::default() + .max_retries(3u32) + .task_limit(Some(2)) + .build() + .unwrap(); + + let mut worker = + Worker::new(WorkerId::new(1), ctx, queue.clone(), computation, options); + + // Push 3 tasks + for i in 0..3 { + let task = Task::new(TestData { + value: i, + should_timeout: false, + should_fail: false, + }); + queue.push(&task).await.unwrap(); + } + + // Run worker until task limit is reached + let mut runs = 0; + while let Ok(true) = worker.run().await { + runs += 1; + } + + assert_eq!(runs, 2); // Should process exactly 2 tasks + assert_eq!(worker.stats.tasks_processed, 2); + } + + #[tokio::test] + async fn test_empty_queue_handling() { + let (mut worker, _queue) = setup_test_worker(); + + // Run worker with empty queue + let result = worker.run().await; + assert!(result.is_ok()); + + // Verify stats unchanged + assert_eq!(worker.stats.tasks_processed, 0); + assert_eq!(worker.stats.tasks_succeeded, 0); + assert_eq!(worker.stats.tasks_failed, 0); + } #[test] fn worker_options() { From e78519d17c691f3342c79cc1309abafed82ac659 Mon Sep 17 00:00:00 2001 From: oiwn Date: Wed, 5 Feb 2025 02:29:18 +0700 Subject: [PATCH 2/7] fix workflow --- .github/workflows/test.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 043c8ce..512f78a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -65,7 +65,8 @@ jobs: key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} - name: Install sqlx-cli - run: cargo install sqlx-cli --no-default-features --features postgres + # run: cargo install sqlx-cli --no-default-features --features postgres + run: cargo install sqlx-cli --no-default-features --features postgres --force - name: Run database migrations run: cargo sqlx migrate run From 807bc3011b35b8c79d9d1e445213fe11ba05174e Mon Sep 17 00:00:00 2001 From: oiwn Date: Wed, 5 Feb 2025 02:31:25 +0700 Subject: [PATCH 3/7] fixing workflow --- .github/workflows/platform-coverage.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/platform-coverage.yml b/.github/workflows/platform-coverage.yml index f9e5247..e0f4651 100644 --- a/.github/workflows/platform-coverage.yml +++ b/.github/workflows/platform-coverage.yml @@ -91,7 +91,8 @@ jobs: key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} - name: Install sqlx-cli - run: cargo install sqlx-cli --no-default-features --features postgres + # run: cargo install sqlx-cli --no-default-features --features postgres + run: cargo install sqlx-cli --no-default-features --features postgres --force - name: Run database migrations run: cargo sqlx migrate run From 7320909fabedba9f758a43251588113a9c0f8b73 Mon Sep 17 00:00:00 2001 From: oiwn Date: Wed, 5 Feb 2025 02:43:51 +0700 Subject: [PATCH 4/7] suppressing audit security checks --- audit.toml | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 audit.toml diff --git a/audit.toml b/audit.toml new file mode 100644 index 0000000..ade21fc --- /dev/null +++ b/audit.toml @@ -0,0 +1,4 @@ +ignore = [ + { id = "RUSTSEC-2025-0003", reason = "It's ok for now"} + { id = "RUSTSEC-2024-0384", reason = "Instant is unmaintained but safe in our usage" } +] From 770c3e6a607876e6b237100d31d0d75e6ab59931 Mon Sep 17 00:00:00 2001 From: oiwn Date: Wed, 5 Feb 2025 12:13:42 +0700 Subject: [PATCH 5/7] fixing workflow for audit --- audit.toml => .cargo/audit.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename audit.toml => .cargo/audit.toml (63%) diff --git a/audit.toml b/.cargo/audit.toml similarity index 63% rename from audit.toml rename to .cargo/audit.toml index ade21fc..200722a 100644 --- a/audit.toml +++ b/.cargo/audit.toml @@ -1,4 +1,4 @@ ignore = [ - { id = "RUSTSEC-2025-0003", reason = "It's ok for now"} + { id = "RUSTSEC-2025-0003", reason = "It's ok for now"}, { id = "RUSTSEC-2024-0384", reason = "Instant is unmaintained but safe in our usage" } ] From c93935857c809d1239104070d82abd8c7fe3b247 Mon Sep 17 00:00:00 2001 From: oiwn Date: Wed, 5 Feb 2025 12:18:38 +0700 Subject: [PATCH 6/7] still fixing workflow --- .cargo/audit.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.cargo/audit.toml b/.cargo/audit.toml index 200722a..8865ae4 100644 --- a/.cargo/audit.toml +++ b/.cargo/audit.toml @@ -1,4 +1,3 @@ ignore = [ - { id = "RUSTSEC-2025-0003", reason = "It's ok for now"}, - { id = "RUSTSEC-2024-0384", reason = "Instant is unmaintained but safe in our usage" } + "RUSTSEC-2025-0003", "RUSTSEC-2024-0384" ] From 3199c3697e4721de2b09857eef8d0e8a065ad399 Mon Sep 17 00:00:00 2001 From: oiwn Date: Wed, 5 Feb 2025 12:22:28 +0700 Subject: [PATCH 7/7] still fixing audit --- .cargo/audit.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/.cargo/audit.toml b/.cargo/audit.toml index 8865ae4..d9e2e05 100644 --- a/.cargo/audit.toml +++ b/.cargo/audit.toml @@ -1,3 +1,4 @@ +[advisories] ignore = [ "RUSTSEC-2025-0003", "RUSTSEC-2024-0384" ]