Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mongodb backend now accespt Database instead of uri #64

Merged
merged 1 commit into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions capp-queue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ tokio = { version = "1.42", features = ["full", "test-util"] }
serial_test = "3"
criterion = { version = "0.5", features = ["html_reports"] }
fake = { version = "3.0.1", features = ["derive", "url"] }
mongodb = { version = "3" }


[features]
Expand All @@ -44,6 +45,6 @@ harness = false
name = "mongo_bench"
harness = false

[[bench]]
name = "postgres_bench"
harness = false
# [[bench]]
# name = "postgres_bench"
# harness = false
14 changes: 12 additions & 2 deletions capp-queue/benches/mongo_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod common;
use capp_queue::{backend::mongodb::BsonSerializer, MongoTaskQueue, TaskQueue};
use criterion::{criterion_group, criterion_main, Criterion};
use dotenvy::dotenv;
// use mongodb::{options::ClientOptions, Client};
use mongodb::{options::ClientOptions, Client};
use tokio::runtime::Runtime;

use capp_queue::task::Task;
Expand All @@ -19,7 +19,17 @@ async fn get_mongo_connection() -> String {

async fn setup_queue() -> MongoTaskQueue<BenchTaskData, BsonSerializer> {
let uri = get_mongo_connection().await;
let queue = MongoTaskQueue::new(&uri, QUEUE_NAME)
let client_options = ClientOptions::parse(&uri)
.await
.expect("Failed to parse options");
let client = Client::with_options(client_options.clone())
.expect("Failed to create client");
let db_name = client_options
.default_database
.as_ref()
.expect("No database specified");
let database = client.database(db_name);
let queue = MongoTaskQueue::new(database, QUEUE_NAME)
.await
.expect("Failed to create MongoTaskQueue");

Expand Down
24 changes: 21 additions & 3 deletions capp-queue/src/backend/mongodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use crate::{Task, TaskId, TaskQueue, TaskQueueError, TaskSerializer};
use async_trait::async_trait;
use mongodb::{
bson::{self, doc},
options::ClientOptions,
Client, Collection,
// options::ClientOptions,
Client,
Collection,
};
use serde::{de::DeserializeOwned, Serialize};
use std::marker::PhantomData;
Expand All @@ -23,7 +24,7 @@ where
D: Send + Sync + 'static,
S: TaskSerializer + Send + Sync,
{
pub async fn new(
/* pub async fn new(
connection_string: &str,
queue_name: &str,
) -> Result<Self, TaskQueueError> {
Expand All @@ -49,6 +50,23 @@ where
dlq_collection,
_marker: PhantomData,
})
} */

pub async fn new(
database: mongodb::Database,
queue_name: &str,
) -> Result<Self, TaskQueueError> {
// Collections store raw BSON documents now
let tasks_collection =
database.collection(&format!("{}_tasks", queue_name));
let dlq_collection = database.collection(&format!("{}_dlq", queue_name));

Ok(Self {
client: database.client().clone(),
tasks_collection,
dlq_collection,
_marker: PhantomData,
})
}
}

Expand Down
116 changes: 54 additions & 62 deletions capp-queue/tests/mongodb_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod tests {
};
use dotenvy::dotenv;
use mongodb::bson::{self, doc};
use mongodb::{options::ClientOptions, Client};
use mongodb::{options::ClientOptions, Client, Database};
use serde::{Deserialize, Serialize};
use std::time::Duration;

Expand All @@ -16,46 +16,45 @@ mod tests {
value: u32,
}

async fn get_mongo_connection() -> String {
async fn get_mongodb() -> Database {
dotenv().ok();
std::env::var("MONGODB_URI").expect("Set MONGODB_URI env variable")
let uri =
std::env::var("MONGODB_URI").expect("Set MONGODB_URI env variable");
let client_options = ClientOptions::parse(&uri)
.await
.expect("Failed to parse options");
let client = Client::with_options(client_options.clone())
.expect("Failed to create client");
let db_name = client_options
.default_database
.as_ref()
.expect("No database specified");
let database = client.database(db_name);
database
}

async fn verify_collection_exists(
client: &Client,
db_name: &str,
db: &Database,
collection_name: &str,
) -> bool {
let db = client.database(db_name);
let collections = db.list_collection_names().await.unwrap();
collections.contains(&collection_name.to_string())
}

async fn cleanup_collections(name: &str) -> Result<(), mongodb::error::Error> {
let uri = get_mongo_connection().await;
let client_options = ClientOptions::parse(&uri).await?;
let client = Client::with_options(client_options.clone())?;

let db_name = client_options
.default_database
.as_ref()
.expect("No database specified in MongoDB URI");

let db = client.database(db_name);

let db = get_mongodb().await;
let tasks_collection_name = format!("{}_tasks", name);
let dlq_collection_name = format!("{}_dlq", name);

// Check if collections exist before dropping
if verify_collection_exists(&client, db_name, &tasks_collection_name).await
{
if verify_collection_exists(&db, &tasks_collection_name).await {
tracing::info!("Dropping collection: {}", tasks_collection_name);
db.collection::<Task<TestData>>(&tasks_collection_name)
.drop()
.await?;
}

if verify_collection_exists(&client, db_name, &dlq_collection_name).await {
if verify_collection_exists(&db, &dlq_collection_name).await {
tracing::info!("Dropping collection: {}", dlq_collection_name);
db.collection::<Task<TestData>>(&dlq_collection_name)
.drop()
Expand All @@ -70,8 +69,10 @@ mod tests {
tracing::error!("Cleanup failed: {:?}", e);
}
tokio::time::sleep(Duration::from_millis(100)).await;
let uri = get_mongo_connection().await;
MongoTaskQueue::new(&uri, name)

let db = get_mongodb().await;

MongoTaskQueue::new(db, name)
.await
.expect("Failed to create MongoTaskQueue")
}
Expand All @@ -81,11 +82,12 @@ mod tests {
// Setup test queue
let queue_name = "test_push";
let task = Task::new(TestData { value: 1 });
let uri = get_mongo_connection().await;
let queue =
MongoTaskQueue::<TestData, BsonSerializer>::new(&uri, queue_name)
.await
.expect("Failed to create MongoTaskQueue");

// Get connection string and create database instance
let db = get_mongodb().await;
let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(db, queue_name)
.await
.expect("Failed to create MongoTaskQueue");

// Push task
queue.push(&task).await.expect("Failed to push task");
Expand All @@ -109,11 +111,11 @@ mod tests {
#[tokio::test]
async fn test_push_and_pop() {
let queue_name = "test_push_pop";
let uri = get_mongo_connection().await;
let queue =
MongoTaskQueue::<TestData, BsonSerializer>::new(&uri, queue_name)
.await
.expect("Failed to create MongoTaskQueue");
let db = get_mongodb().await;

let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(db, queue_name)
.await
.expect("Failed to create MongoTaskQueue");

let task = Task::new(TestData { value: 42 });
let original_id = task.task_id;
Expand All @@ -136,12 +138,10 @@ mod tests {
#[tokio::test]
async fn test_pop_status_handling() {
let queue_name = "test_pop_status";
let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(
&get_mongo_connection().await,
queue_name,
)
.await
.expect("Failed to create MongoTaskQueue");
let db = get_mongodb().await;
let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(db, queue_name)
.await
.expect("Failed to create MongoTaskQueue");

// Push a task
let task = Task::new(TestData { value: 42 });
Expand Down Expand Up @@ -177,12 +177,10 @@ mod tests {
#[tokio::test]
async fn test_push_pop_order() {
let queue_name = "test_push_pop_order";
let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(
&get_mongo_connection().await,
queue_name,
)
.await
.expect("Failed to create MongoTaskQueue");
let db = get_mongodb().await;
let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(db, queue_name)
.await
.expect("Failed to create MongoTaskQueue");

// Push multiple tasks in sequence
let tasks = vec![
Expand Down Expand Up @@ -215,12 +213,10 @@ mod tests {
#[tokio::test]
async fn test_ack() {
let queue_name = "test_ack";
let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(
&get_mongo_connection().await,
queue_name,
)
.await
.expect("Failed to create MongoTaskQueue");
let db = get_mongodb().await;
let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(db, queue_name)
.await
.expect("Failed to create MongoTaskQueue");

// Push and pop a task
let task = Task::new(TestData { value: 42 });
Expand Down Expand Up @@ -255,12 +251,10 @@ mod tests {
#[tokio::test]
async fn test_set() {
let queue_name = "test_set";
let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(
&get_mongo_connection().await,
queue_name,
)
.await
.expect("Failed to create MongoTaskQueue");
let db = get_mongodb().await;
let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(db, queue_name)
.await
.expect("Failed to create MongoTaskQueue");

// Create and push initial task
let mut task = Task::new(TestData { value: 42 });
Expand Down Expand Up @@ -341,12 +335,10 @@ mod tests {
#[tokio::test]
async fn test_nack() {
let queue_name = "test_nack";
let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(
&get_mongo_connection().await,
queue_name,
)
.await
.expect("Failed to create MongoTaskQueue");
let db = get_mongodb().await;
let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(db, queue_name)
.await
.expect("Failed to create MongoTaskQueue");

// Push and pop a task
let task = Task::new(TestData { value: 42 });
Expand Down
1 change: 0 additions & 1 deletion capp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ rand = "0.8"
md5 = "0.7"
url = "2.5"
base64 = "0.22"
# tempfile = "3"

[features]
http = ["dep:reqwest", "capp-config/http"]
Expand Down