diff --git a/capp-queue/Cargo.toml b/capp-queue/Cargo.toml index 974953d..6c47d69 100644 --- a/capp-queue/Cargo.toml +++ b/capp-queue/Cargo.toml @@ -24,6 +24,7 @@ tokio = { version = "1.42", features = ["full", "test-util"] } serial_test = "3" criterion = { version = "0.5", features = ["html_reports"] } fake = { version = "3.0.1", features = ["derive", "url"] } +mongodb = { version = "3" } [features] @@ -44,6 +45,6 @@ harness = false name = "mongo_bench" harness = false -[[bench]] -name = "postgres_bench" -harness = false +# [[bench]] +# name = "postgres_bench" +# harness = false diff --git a/capp-queue/benches/mongo_bench.rs b/capp-queue/benches/mongo_bench.rs index e7e3b59..53abbc1 100644 --- a/capp-queue/benches/mongo_bench.rs +++ b/capp-queue/benches/mongo_bench.rs @@ -3,7 +3,7 @@ mod common; use capp_queue::{backend::mongodb::BsonSerializer, MongoTaskQueue, TaskQueue}; use criterion::{criterion_group, criterion_main, Criterion}; use dotenvy::dotenv; -// use mongodb::{options::ClientOptions, Client}; +use mongodb::{options::ClientOptions, Client}; use tokio::runtime::Runtime; use capp_queue::task::Task; @@ -19,7 +19,17 @@ async fn get_mongo_connection() -> String { async fn setup_queue() -> MongoTaskQueue { let uri = get_mongo_connection().await; - let queue = MongoTaskQueue::new(&uri, QUEUE_NAME) + let client_options = ClientOptions::parse(&uri) + .await + .expect("Failed to parse options"); + let client = Client::with_options(client_options.clone()) + .expect("Failed to create client"); + let db_name = client_options + .default_database + .as_ref() + .expect("No database specified"); + let database = client.database(db_name); + let queue = MongoTaskQueue::new(database, QUEUE_NAME) .await .expect("Failed to create MongoTaskQueue"); diff --git a/capp-queue/src/backend/mongodb.rs b/capp-queue/src/backend/mongodb.rs index 8660d6b..b53dc3b 100644 --- a/capp-queue/src/backend/mongodb.rs +++ b/capp-queue/src/backend/mongodb.rs @@ -2,8 +2,9 @@ use crate::{Task, TaskId, TaskQueue, TaskQueueError, TaskSerializer}; use async_trait::async_trait; use mongodb::{ bson::{self, doc}, - options::ClientOptions, - Client, Collection, + // options::ClientOptions, + Client, + Collection, }; use serde::{de::DeserializeOwned, Serialize}; use std::marker::PhantomData; @@ -23,7 +24,7 @@ where D: Send + Sync + 'static, S: TaskSerializer + Send + Sync, { - pub async fn new( + /* pub async fn new( connection_string: &str, queue_name: &str, ) -> Result { @@ -49,6 +50,23 @@ where dlq_collection, _marker: PhantomData, }) + } */ + + pub async fn new( + database: mongodb::Database, + queue_name: &str, + ) -> Result { + // Collections store raw BSON documents now + let tasks_collection = + database.collection(&format!("{}_tasks", queue_name)); + let dlq_collection = database.collection(&format!("{}_dlq", queue_name)); + + Ok(Self { + client: database.client().clone(), + tasks_collection, + dlq_collection, + _marker: PhantomData, + }) } } diff --git a/capp-queue/tests/mongodb_tests.rs b/capp-queue/tests/mongodb_tests.rs index a8fcf88..a100833 100644 --- a/capp-queue/tests/mongodb_tests.rs +++ b/capp-queue/tests/mongodb_tests.rs @@ -7,7 +7,7 @@ mod tests { }; use dotenvy::dotenv; use mongodb::bson::{self, doc}; - use mongodb::{options::ClientOptions, Client}; + use mongodb::{options::ClientOptions, Client, Database}; use serde::{Deserialize, Serialize}; use std::time::Duration; @@ -16,46 +16,45 @@ mod tests { value: u32, } - async fn get_mongo_connection() -> String { + async fn get_mongodb() -> Database { dotenv().ok(); - std::env::var("MONGODB_URI").expect("Set MONGODB_URI env variable") + let uri = + std::env::var("MONGODB_URI").expect("Set MONGODB_URI env variable"); + let client_options = ClientOptions::parse(&uri) + .await + .expect("Failed to parse options"); + let client = Client::with_options(client_options.clone()) + .expect("Failed to create client"); + let db_name = client_options + .default_database + .as_ref() + .expect("No database specified"); + let database = client.database(db_name); + database } async fn verify_collection_exists( - client: &Client, - db_name: &str, + db: &Database, collection_name: &str, ) -> bool { - let db = client.database(db_name); let collections = db.list_collection_names().await.unwrap(); collections.contains(&collection_name.to_string()) } async fn cleanup_collections(name: &str) -> Result<(), mongodb::error::Error> { - let uri = get_mongo_connection().await; - let client_options = ClientOptions::parse(&uri).await?; - let client = Client::with_options(client_options.clone())?; - - let db_name = client_options - .default_database - .as_ref() - .expect("No database specified in MongoDB URI"); - - let db = client.database(db_name); - + let db = get_mongodb().await; let tasks_collection_name = format!("{}_tasks", name); let dlq_collection_name = format!("{}_dlq", name); // Check if collections exist before dropping - if verify_collection_exists(&client, db_name, &tasks_collection_name).await - { + if verify_collection_exists(&db, &tasks_collection_name).await { tracing::info!("Dropping collection: {}", tasks_collection_name); db.collection::>(&tasks_collection_name) .drop() .await?; } - if verify_collection_exists(&client, db_name, &dlq_collection_name).await { + if verify_collection_exists(&db, &dlq_collection_name).await { tracing::info!("Dropping collection: {}", dlq_collection_name); db.collection::>(&dlq_collection_name) .drop() @@ -70,8 +69,10 @@ mod tests { tracing::error!("Cleanup failed: {:?}", e); } tokio::time::sleep(Duration::from_millis(100)).await; - let uri = get_mongo_connection().await; - MongoTaskQueue::new(&uri, name) + + let db = get_mongodb().await; + + MongoTaskQueue::new(db, name) .await .expect("Failed to create MongoTaskQueue") } @@ -81,11 +82,12 @@ mod tests { // Setup test queue let queue_name = "test_push"; let task = Task::new(TestData { value: 1 }); - let uri = get_mongo_connection().await; - let queue = - MongoTaskQueue::::new(&uri, queue_name) - .await - .expect("Failed to create MongoTaskQueue"); + + // Get connection string and create database instance + let db = get_mongodb().await; + let queue = MongoTaskQueue::::new(db, queue_name) + .await + .expect("Failed to create MongoTaskQueue"); // Push task queue.push(&task).await.expect("Failed to push task"); @@ -109,11 +111,11 @@ mod tests { #[tokio::test] async fn test_push_and_pop() { let queue_name = "test_push_pop"; - let uri = get_mongo_connection().await; - let queue = - MongoTaskQueue::::new(&uri, queue_name) - .await - .expect("Failed to create MongoTaskQueue"); + let db = get_mongodb().await; + + let queue = MongoTaskQueue::::new(db, queue_name) + .await + .expect("Failed to create MongoTaskQueue"); let task = Task::new(TestData { value: 42 }); let original_id = task.task_id; @@ -136,12 +138,10 @@ mod tests { #[tokio::test] async fn test_pop_status_handling() { let queue_name = "test_pop_status"; - let queue = MongoTaskQueue::::new( - &get_mongo_connection().await, - queue_name, - ) - .await - .expect("Failed to create MongoTaskQueue"); + let db = get_mongodb().await; + let queue = MongoTaskQueue::::new(db, queue_name) + .await + .expect("Failed to create MongoTaskQueue"); // Push a task let task = Task::new(TestData { value: 42 }); @@ -177,12 +177,10 @@ mod tests { #[tokio::test] async fn test_push_pop_order() { let queue_name = "test_push_pop_order"; - let queue = MongoTaskQueue::::new( - &get_mongo_connection().await, - queue_name, - ) - .await - .expect("Failed to create MongoTaskQueue"); + let db = get_mongodb().await; + let queue = MongoTaskQueue::::new(db, queue_name) + .await + .expect("Failed to create MongoTaskQueue"); // Push multiple tasks in sequence let tasks = vec![ @@ -215,12 +213,10 @@ mod tests { #[tokio::test] async fn test_ack() { let queue_name = "test_ack"; - let queue = MongoTaskQueue::::new( - &get_mongo_connection().await, - queue_name, - ) - .await - .expect("Failed to create MongoTaskQueue"); + let db = get_mongodb().await; + let queue = MongoTaskQueue::::new(db, queue_name) + .await + .expect("Failed to create MongoTaskQueue"); // Push and pop a task let task = Task::new(TestData { value: 42 }); @@ -255,12 +251,10 @@ mod tests { #[tokio::test] async fn test_set() { let queue_name = "test_set"; - let queue = MongoTaskQueue::::new( - &get_mongo_connection().await, - queue_name, - ) - .await - .expect("Failed to create MongoTaskQueue"); + let db = get_mongodb().await; + let queue = MongoTaskQueue::::new(db, queue_name) + .await + .expect("Failed to create MongoTaskQueue"); // Create and push initial task let mut task = Task::new(TestData { value: 42 }); @@ -341,12 +335,10 @@ mod tests { #[tokio::test] async fn test_nack() { let queue_name = "test_nack"; - let queue = MongoTaskQueue::::new( - &get_mongo_connection().await, - queue_name, - ) - .await - .expect("Failed to create MongoTaskQueue"); + let db = get_mongodb().await; + let queue = MongoTaskQueue::::new(db, queue_name) + .await + .expect("Failed to create MongoTaskQueue"); // Push and pop a task let task = Task::new(TestData { value: 42 }); diff --git a/capp/Cargo.toml b/capp/Cargo.toml index f3d4532..46f4ca1 100644 --- a/capp/Cargo.toml +++ b/capp/Cargo.toml @@ -60,7 +60,6 @@ rand = "0.8" md5 = "0.7" url = "2.5" base64 = "0.22" -# tempfile = "3" [features] http = ["dep:reqwest", "capp-config/http"]