From e16c67eb67f77447fbe0ff3b0c0753a18f1193bd Mon Sep 17 00:00:00 2001 From: oiwn Date: Sat, 14 Dec 2024 23:42:16 +0700 Subject: [PATCH] add failing tests for mongodb --- capp-queue/src/backend/mongodb.rs | 149 ++++++++------- capp-queue/tests/mongodb_tests.rs | 305 ++++++++++++++++++++++++++++++ capp/Cargo.toml | 8 +- capp/src/lib.rs | 2 + 4 files changed, 396 insertions(+), 68 deletions(-) create mode 100644 capp-queue/tests/mongodb_tests.rs diff --git a/capp-queue/src/backend/mongodb.rs b/capp-queue/src/backend/mongodb.rs index ad2d282..ce12d02 100644 --- a/capp-queue/src/backend/mongodb.rs +++ b/capp-queue/src/backend/mongodb.rs @@ -1,23 +1,29 @@ use async_trait::async_trait; use mongodb::{ - bson::{doc, Document}, - options::{ClientOptions, FindOneAndDeleteOptions}, - Client, Collection, + bson::doc, + error::TRANSIENT_TRANSACTION_ERROR, + error::UNKNOWN_TRANSACTION_COMMIT_RESULT, + options::{ClientOptions, IndexOptions}, + Client, ClientSession, Collection, IndexModel, }; use serde::{de::DeserializeOwned, Serialize}; -use std::marker::PhantomData; use crate::queue::{TaskQueue, TaskQueueError}; use crate::task::{Task, TaskId}; -pub struct MongoTaskQueue { +pub struct MongoTaskQueue +where + D: Send + Sync + 'static, +{ pub client: Client, - pub tasks_collection: Collection, - pub dlq_collection: Collection, - _marker: PhantomData, + pub tasks_collection: Collection>, + pub dlq_collection: Collection>, } -impl MongoTaskQueue { +impl MongoTaskQueue +where + D: Clone + Serialize + DeserializeOwned + Send + Sync + 'static, +{ pub async fn new( connection_string: &str, queue_name: &str, @@ -30,12 +36,19 @@ impl MongoTaskQueue { .map_err(|e| TaskQueueError::QueueError(e.to_string()))?; let db = client.database("task_queue"); - let tasks_collection = db.collection(&format!("{}_tasks", queue_name)); - let dlq_collection = db.collection(&format!("{}_dlq", queue_name)); + let tasks_collection = + db.collection::>(&format!("{}_tasks", queue_name)); + let dlq_collection = + db.collection::>(&format!("{}_dlq", queue_name)); + + // Create index on task_id + let index_model = IndexModel::builder() + .keys(doc! { "task_id": 1 }) + .options(IndexOptions::builder().unique(true).build()) + .build(); - // Create indexes tasks_collection - .create_index(doc! { "task_id": 1 }, None) + .create_index(index_model) .await .map_err(|e| TaskQueueError::QueueError(e.to_string()))?; @@ -43,11 +56,45 @@ impl MongoTaskQueue { client, tasks_collection, dlq_collection, - _marker: PhantomData, }) } } +impl MongoTaskQueue +where + D: Clone + Serialize + DeserializeOwned + Send + Sync + 'static, +{ + // Helper method to execute the nack transaction + async fn execute_nack_transaction( + &self, + task: &Task, + session: &mut ClientSession, + ) -> mongodb::error::Result<()> { + // Move to DLQ + self.dlq_collection + .insert_one(task) + .session(&mut *session) + .await?; + + // Remove from main queue + self.tasks_collection + .delete_one(doc! { "task_id": task.task_id.to_string() }) + .session(&mut *session) + .await?; + + // Commit with retry logic for unknown commit results + loop { + let result = session.commit_transaction().await; + if let Err(ref error) = result { + if error.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) { + continue; + } + } + return result; + } + } +} + #[async_trait] impl TaskQueue for MongoTaskQueue where @@ -60,11 +107,8 @@ where + 'static, { async fn push(&self, task: &Task) -> Result<(), TaskQueueError> { - let task_doc = mongodb::bson::to_document(&task) - .map_err(|e| TaskQueueError::SerdeError(e.to_string()))?; - self.tasks_collection - .insert_one(task_doc, None) + .insert_one(task) .await .map_err(|e| TaskQueueError::QueueError(e.to_string()))?; @@ -72,20 +116,13 @@ where } async fn pop(&self) -> Result, TaskQueueError> { - let options = FindOneAndDeleteOptions::default(); - - let result = self + match self .tasks_collection - .find_one_and_delete(doc! {}, options) + .find_one_and_delete(doc! {}) .await - .map_err(|e| TaskQueueError::QueueError(e.to_string()))?; - - match result { - Some(doc) => { - let task: Task = mongodb::bson::from_document(doc) - .map_err(|e| TaskQueueError::SerdeError(e.to_string()))?; - Ok(task) - } + .map_err(|e| TaskQueueError::QueueError(e.to_string()))? + { + Some(task) => Ok(task), None => Err(TaskQueueError::QueueEmpty), } } @@ -93,7 +130,7 @@ where async fn ack(&self, task_id: &TaskId) -> Result<(), TaskQueueError> { let result = self .tasks_collection - .delete_one(doc! { "task_id": task_id.to_string() }, None) + .delete_one(doc! { "task_id": task_id.to_string() }) .await .map_err(|e| TaskQueueError::QueueError(e.to_string()))?; @@ -105,54 +142,38 @@ where } async fn nack(&self, task: &Task) -> Result<(), TaskQueueError> { - let task_doc = mongodb::bson::to_document(&task) - .map_err(|e| TaskQueueError::SerdeError(e.to_string()))?; - - // Start session for transaction let mut session = self .client - .start_session(None) + .start_session() .await .map_err(|e| TaskQueueError::QueueError(e.to_string()))?; + // Configure transaction options with majority read/write concerns session - .start_transaction(None) - .await - .map_err(|e| TaskQueueError::QueueError(e.to_string()))?; - - // Move to DLQ and remove from main queue - self.dlq_collection - .insert_one_with_session(task_doc, None, &mut session) - .await - .map_err(|e| TaskQueueError::QueueError(e.to_string()))?; - - self.tasks_collection - .delete_one_with_session( - doc! { "task_id": task.task_id.to_string() }, - None, - &mut session, - ) + .start_transaction() .await .map_err(|e| TaskQueueError::QueueError(e.to_string()))?; - session - .commit_transaction() - .await - .map_err(|e| TaskQueueError::QueueError(e.to_string()))?; + // Execute transaction with retry logic for transient errors + while let Err(error) = + self.execute_nack_transaction(task, &mut session).await + { + if !error.contains_label(TRANSIENT_TRANSACTION_ERROR) { + return Err(TaskQueueError::QueueError(error.to_string())); + } + // Retry transaction on transient errors + session + .start_transaction() + .await + .map_err(|e| TaskQueueError::QueueError(e.to_string()))?; + } Ok(()) } async fn set(&self, task: &Task) -> Result<(), TaskQueueError> { - let task_doc = mongodb::bson::to_document(&task) - .map_err(|e| TaskQueueError::SerdeError(e.to_string()))?; - self.tasks_collection - .replace_one( - doc! { "task_id": task.task_id.to_string() }, - task_doc, - None, - ) + .replace_one(doc! { "task_id": task.task_id.to_string() }, task) .await .map_err(|e| TaskQueueError::QueueError(e.to_string()))?; diff --git a/capp-queue/tests/mongodb_tests.rs b/capp-queue/tests/mongodb_tests.rs new file mode 100644 index 0000000..0cb1daf --- /dev/null +++ b/capp-queue/tests/mongodb_tests.rs @@ -0,0 +1,305 @@ +#[cfg(test)] +mod tests { + use capp_queue::queue::{MongoTaskQueue, TaskQueue, TaskQueueError}; + use capp_queue::task::Task; + use dotenvy::dotenv; + use mongodb::{bson::doc, options::ClientOptions, Client}; + use serde::{Deserialize, Serialize}; + + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] + struct TestData { + value: u32, + } + + async fn get_mongo_connection() -> String { + dotenv().ok(); + std::env::var("MONGODB_URI").expect("Set MONGODB_URI env variable") + } + + async fn cleanup_collections(name: &str) { + let uri = get_mongo_connection().await; + let client_options = ClientOptions::parse(&uri) + .await + .expect("Failed to parse MongoDB options"); + let client = Client::with_options(client_options.clone()) + .expect("Failed to create MongoDB client"); + + // Get database name from URI or use default + let db_name = client_options + .default_database + .as_ref() + .expect("No database specified in MongoDB URI"); + + let db = client.database(db_name); + + // Drop collections if they exist + let _ = db + .collection::>(&format!("{}_tasks", name)) + .drop() + .await; + let _ = db + .collection::>(&format!("{}_dlq", name)) + .drop() + .await; + } + + async fn setup_queue(name: &str) -> MongoTaskQueue { + cleanup_collections(name).await; + let uri = get_mongo_connection().await; + MongoTaskQueue::new(&uri, name) + .await + .expect("Failed to create MongoTaskQueue") + } + + #[tokio::test] + async fn test_typical_workflow() { + let queue = setup_queue("capp-test-workflow").await; + + // Test push and pop + let task = Task::new(TestData { value: 42 }); + let task_id = task.task_id; + queue.push(&task).await.expect("Failed to push task"); + + let popped_task = queue.pop().await.expect("Failed to pop task"); + assert_eq!(popped_task.payload.value, 42); + assert_eq!(popped_task.task_id, task_id); // Verify task ID matches + + // Test ack + let acked = queue.ack(&popped_task.task_id).await; + assert!(acked.is_ok(), "Failed to ack task: {:?}", acked); + + // Verify queue is empty + assert!(matches!(queue.pop().await, Err(TaskQueueError::QueueEmpty))); + + // Test multiple tasks + let task_1 = Task::new(TestData { value: 1 }); + let task_2 = Task::new(TestData { value: 2 }); + queue.push(&task_1).await.expect("Failed to push task 1"); + queue.push(&task_2).await.expect("Failed to push task 2"); + + let popped = queue.pop().await.expect("Failed to pop task"); + assert_eq!(popped.payload.value, 1); + queue + .ack(&popped.task_id) + .await + .expect("Failed to ack task"); + + let popped = queue.pop().await.expect("Failed to pop task"); + assert_eq!(popped.payload.value, 2); + queue.nack(&popped).await.expect("Failed to nack task"); + } + + #[tokio::test] + async fn test_push_and_pop() { + let queue = setup_queue("capp-test-push-pop").await; + let task = Task::new(TestData { value: 42 }); + let task_id = task.task_id; + + queue.push(&task).await.expect("Failed to push task"); + + let popped_task = queue.pop().await.expect("Failed to pop task"); + assert_eq!(popped_task.payload.value, 42); + assert_eq!(popped_task.task_id, task_id); + } + + #[tokio::test] + async fn test_push_multiple_and_pop_order() { + let queue = setup_queue("capp-test-push-pop-order").await; + let tasks = vec![ + Task::new(TestData { value: 1 }), + Task::new(TestData { value: 2 }), + Task::new(TestData { value: 3 }), + ]; + + for task in &tasks { + queue.push(task).await.expect("Failed to push task"); + } + + for expected_value in 1..=3 { + let popped_task = queue.pop().await.expect("Failed to pop task"); + assert_eq!(popped_task.payload.value, expected_value); + } + + assert!(matches!(queue.pop().await, Err(TaskQueueError::QueueEmpty))); + } + + #[tokio::test] + async fn test_ack() { + let queue = setup_queue("capp-test-ack").await; + let task = Task::new(TestData { value: 42 }); + let task_id = task.task_id; + + queue.push(&task).await.expect("Failed to push task"); + let popped_task = queue.pop().await.expect("Failed to pop task"); + assert_eq!(popped_task.task_id, task_id); // Verify we got the right task + + queue + .ack(&popped_task.task_id) + .await + .expect("Failed to ack task"); + + assert!(matches!(queue.pop().await, Err(TaskQueueError::QueueEmpty))); + } + + #[tokio::test] + async fn test_nack() { + let queue = setup_queue("capp-test-nack").await; + let task = Task::new(TestData { value: 42 }); + let task_id = task.task_id; + + queue.push(&task).await.expect("Failed to push task"); + let popped_task = queue.pop().await.expect("Failed to pop task"); + assert_eq!(popped_task.task_id, task_id); + + // Disable retryable writes for transactions + queue.nack(&popped_task).await.expect("Failed to nack task"); + + assert!(matches!(queue.pop().await, Err(TaskQueueError::QueueEmpty))); + } + + #[tokio::test] + async fn test_set() { + let queue = setup_queue("capp-test-set").await; + let task = Task::new(TestData { value: 42 }); + let task_id = task.task_id; + + queue.push(&task).await.expect("Failed to push task"); + + let mut popped_task = queue.pop().await.expect("Failed to pop task"); + assert_eq!(popped_task.task_id, task_id); + + popped_task.payload.value = 43; + queue.set(&popped_task).await.expect("Failed to set task"); + + let updated_task = queue.pop().await.expect("Failed to get updated task"); + assert_eq!(updated_task.payload.value, 43); + } + + #[tokio::test] + async fn test_queue_empty() { + let queue = setup_queue("capp-test-empty").await; + assert!(matches!(queue.pop().await, Err(TaskQueueError::QueueEmpty))); + } + + #[tokio::test] + async fn test_task_persistence() { + let queue = setup_queue("capp-test-persistence").await; + let task = Task::new(TestData { value: 42 }); + let task_id = task.task_id; + + // Test push + queue.push(&task).await.expect("Failed to push task"); + + // Verify task exists in MongoDB directly + let raw_task = queue + .tasks_collection + .find_one(doc! { "task_id": task_id.to_string() }) + .await + .expect("Failed to query task") + .expect("Task not found in collection"); + + assert_eq!(raw_task.task_id, task_id); + assert_eq!(raw_task.payload.value, 42); + } + + #[tokio::test] + async fn test_task_removal() { + let queue = setup_queue("capp-test-removal").await; + let task = Task::new(TestData { value: 42 }); + let task_id = task.task_id; + + // Push and verify task exists + queue.push(&task).await.expect("Failed to push task"); + + let exists_before = queue + .tasks_collection + .find_one(doc! { "task_id": task_id.to_string() }) + .await + .expect("Failed to query task"); + assert!(exists_before.is_some(), "Task should exist before pop"); + + // Pop and verify task is gone + let popped = queue.pop().await.expect("Failed to pop task"); + assert_eq!(popped.task_id, task_id); + + let exists_after = queue + .tasks_collection + .find_one(doc! { "task_id": task_id.to_string() }) + .await + .expect("Failed to query task"); + assert!(exists_after.is_none(), "Task should be removed after pop"); + } + + #[tokio::test] + async fn test_nack_task_moves_to_dlq() { + let queue = setup_queue("capp-test-nack-dlq").await; + let task = Task::new(TestData { value: 42 }); + let task_id = task.task_id; + + // Push task + queue.push(&task).await.expect("Failed to push task"); + + // Pop task + let popped = queue.pop().await.expect("Failed to pop task"); + assert_eq!(popped.task_id, task_id); + + // Try to nack without transactions first + let nack_result = queue.nack(&popped).await; + println!("Nack result: {:?}", nack_result); + + // Verify task moved to DLQ + let in_dlq = queue + .dlq_collection + .find_one(doc! { "task_id": task_id.to_string() }) + .await + .expect("Failed to query DLQ"); + + if let Some(dlq_task) = in_dlq { + println!("Task found in DLQ: {:?}", dlq_task); + } else { + println!("Task not found in DLQ"); + } + + // Verify removed from main queue + let in_main = queue + .tasks_collection + .find_one(doc! { "task_id": task_id.to_string() }) + .await + .expect("Failed to query main queue"); + + if let Some(main_task) = in_main { + println!("Task still in main queue: {:?}", main_task); + } else { + println!("Task not in main queue"); + } + } + + #[tokio::test] + async fn test_task_set_update() { + let queue = setup_queue("capp-test-set-update").await; + let task = Task::new(TestData { value: 42 }); + let task_id = task.task_id; + + // Push initial task + queue.push(&task).await.expect("Failed to push task"); + + // Update task via set + let mut updated_task = task.clone(); + updated_task.payload.value = 43; + queue.set(&updated_task).await.expect("Failed to set task"); + + // Verify update directly in MongoDB + let task_in_db = queue + .tasks_collection + .find_one(doc! { "task_id": task_id.to_string() }) + .await + .expect("Failed to query task") + .expect("Task not found after update"); + + println!("Original task: {:?}", task); + println!("Updated task: {:?}", updated_task); + println!("Task in DB: {:?}", task_in_db); + + assert_eq!(task_in_db.payload.value, 43); + } +} diff --git a/capp/Cargo.toml b/capp/Cargo.toml index daedc4f..06b560a 100644 --- a/capp/Cargo.toml +++ b/capp/Cargo.toml @@ -44,8 +44,8 @@ capp-queue = { path = "../capp-queue" } derive_builder = { version = "0.20" } reqwest = { version = "0.12", features = ["gzip", "rustls-tls", "json"], optional = true } serde_yaml = "0.9" -# rustis = { version = "0.13", features = ["tokio-runtime"], optional = true } -# mongodb = { version = "3", features = ["tokio-runtime"], optional = true } +rustis = { version = "0.13", features = ["tokio-runtime"], optional = true } +mongodb = { version = "3", optional = true } tracing-futures = "0.2" [dev-dependencies] @@ -66,5 +66,5 @@ tempfile = "3" http = ["dep:reqwest", "capp-config/http"] router = ["capp-config/router"] healthcheck = ["dep:reqwest"] -redis = ["capp-queue/redis"] -mongodb = ["capp-queue/mongodb"] +redis = ["capp-queue/redis", "dep:rustis"] +mongodb = ["capp-queue/mongodb", "dep:mongodb"] diff --git a/capp/src/lib.rs b/capp/src/lib.rs index 537610b..6aaf5a6 100644 --- a/capp/src/lib.rs +++ b/capp/src/lib.rs @@ -49,6 +49,8 @@ pub mod prelude; pub use async_trait; #[cfg(feature = "http")] pub use derive_builder; +#[cfg(feature = "mongodb")] +pub use mongodb; #[cfg(feature = "http")] pub use reqwest; #[cfg(feature = "redis")]