Skip to content

Commit

Permalink
Merge pull request #64 from oiwn/dev
Browse files Browse the repository at this point in the history
Mongodb backend now accespt Database instead of uri
  • Loading branch information
oiwn authored Feb 10, 2025
2 parents f8691d8 + 2aad73f commit f8f8112
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 71 deletions.
7 changes: 4 additions & 3 deletions capp-queue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ tokio = { version = "1.42", features = ["full", "test-util"] }
serial_test = "3"
criterion = { version = "0.5", features = ["html_reports"] }
fake = { version = "3.0.1", features = ["derive", "url"] }
mongodb = { version = "3" }


[features]
Expand All @@ -44,6 +45,6 @@ harness = false
name = "mongo_bench"
harness = false

[[bench]]
name = "postgres_bench"
harness = false
# [[bench]]
# name = "postgres_bench"
# harness = false
14 changes: 12 additions & 2 deletions capp-queue/benches/mongo_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod common;
use capp_queue::{backend::mongodb::BsonSerializer, MongoTaskQueue, TaskQueue};
use criterion::{criterion_group, criterion_main, Criterion};
use dotenvy::dotenv;
// use mongodb::{options::ClientOptions, Client};
use mongodb::{options::ClientOptions, Client};
use tokio::runtime::Runtime;

use capp_queue::task::Task;
Expand All @@ -19,7 +19,17 @@ async fn get_mongo_connection() -> String {

async fn setup_queue() -> MongoTaskQueue<BenchTaskData, BsonSerializer> {
let uri = get_mongo_connection().await;
let queue = MongoTaskQueue::new(&uri, QUEUE_NAME)
let client_options = ClientOptions::parse(&uri)
.await
.expect("Failed to parse options");
let client = Client::with_options(client_options.clone())
.expect("Failed to create client");
let db_name = client_options
.default_database
.as_ref()
.expect("No database specified");
let database = client.database(db_name);
let queue = MongoTaskQueue::new(database, QUEUE_NAME)
.await
.expect("Failed to create MongoTaskQueue");

Expand Down
24 changes: 21 additions & 3 deletions capp-queue/src/backend/mongodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use crate::{Task, TaskId, TaskQueue, TaskQueueError, TaskSerializer};
use async_trait::async_trait;
use mongodb::{
bson::{self, doc},
options::ClientOptions,
Client, Collection,
// options::ClientOptions,
Client,
Collection,
};
use serde::{de::DeserializeOwned, Serialize};
use std::marker::PhantomData;
Expand All @@ -23,7 +24,7 @@ where
D: Send + Sync + 'static,
S: TaskSerializer + Send + Sync,
{
pub async fn new(
/* pub async fn new(
connection_string: &str,
queue_name: &str,
) -> Result<Self, TaskQueueError> {
Expand All @@ -49,6 +50,23 @@ where
dlq_collection,
_marker: PhantomData,
})
} */

pub async fn new(
database: mongodb::Database,
queue_name: &str,
) -> Result<Self, TaskQueueError> {
// Collections store raw BSON documents now
let tasks_collection =
database.collection(&format!("{}_tasks", queue_name));
let dlq_collection = database.collection(&format!("{}_dlq", queue_name));

Ok(Self {
client: database.client().clone(),
tasks_collection,
dlq_collection,
_marker: PhantomData,
})
}
}

Expand Down
116 changes: 54 additions & 62 deletions capp-queue/tests/mongodb_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod tests {
};
use dotenvy::dotenv;
use mongodb::bson::{self, doc};
use mongodb::{options::ClientOptions, Client};
use mongodb::{options::ClientOptions, Client, Database};
use serde::{Deserialize, Serialize};
use std::time::Duration;

Expand All @@ -16,46 +16,45 @@ mod tests {
value: u32,
}

async fn get_mongo_connection() -> String {
async fn get_mongodb() -> Database {
dotenv().ok();
std::env::var("MONGODB_URI").expect("Set MONGODB_URI env variable")
let uri =
std::env::var("MONGODB_URI").expect("Set MONGODB_URI env variable");
let client_options = ClientOptions::parse(&uri)
.await
.expect("Failed to parse options");
let client = Client::with_options(client_options.clone())
.expect("Failed to create client");
let db_name = client_options
.default_database
.as_ref()
.expect("No database specified");
let database = client.database(db_name);
database
}

async fn verify_collection_exists(
client: &Client,
db_name: &str,
db: &Database,
collection_name: &str,
) -> bool {
let db = client.database(db_name);
let collections = db.list_collection_names().await.unwrap();
collections.contains(&collection_name.to_string())
}

async fn cleanup_collections(name: &str) -> Result<(), mongodb::error::Error> {
let uri = get_mongo_connection().await;
let client_options = ClientOptions::parse(&uri).await?;
let client = Client::with_options(client_options.clone())?;

let db_name = client_options
.default_database
.as_ref()
.expect("No database specified in MongoDB URI");

let db = client.database(db_name);

let db = get_mongodb().await;
let tasks_collection_name = format!("{}_tasks", name);
let dlq_collection_name = format!("{}_dlq", name);

// Check if collections exist before dropping
if verify_collection_exists(&client, db_name, &tasks_collection_name).await
{
if verify_collection_exists(&db, &tasks_collection_name).await {
tracing::info!("Dropping collection: {}", tasks_collection_name);
db.collection::<Task<TestData>>(&tasks_collection_name)
.drop()
.await?;
}

if verify_collection_exists(&client, db_name, &dlq_collection_name).await {
if verify_collection_exists(&db, &dlq_collection_name).await {
tracing::info!("Dropping collection: {}", dlq_collection_name);
db.collection::<Task<TestData>>(&dlq_collection_name)
.drop()
Expand All @@ -70,8 +69,10 @@ mod tests {
tracing::error!("Cleanup failed: {:?}", e);
}
tokio::time::sleep(Duration::from_millis(100)).await;
let uri = get_mongo_connection().await;
MongoTaskQueue::new(&uri, name)

let db = get_mongodb().await;

MongoTaskQueue::new(db, name)
.await
.expect("Failed to create MongoTaskQueue")
}
Expand All @@ -81,11 +82,12 @@ mod tests {
// Setup test queue
let queue_name = "test_push";
let task = Task::new(TestData { value: 1 });
let uri = get_mongo_connection().await;
let queue =
MongoTaskQueue::<TestData, BsonSerializer>::new(&uri, queue_name)
.await
.expect("Failed to create MongoTaskQueue");

// Get connection string and create database instance
let db = get_mongodb().await;
let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(db, queue_name)
.await
.expect("Failed to create MongoTaskQueue");

// Push task
queue.push(&task).await.expect("Failed to push task");
Expand All @@ -109,11 +111,11 @@ mod tests {
#[tokio::test]
async fn test_push_and_pop() {
let queue_name = "test_push_pop";
let uri = get_mongo_connection().await;
let queue =
MongoTaskQueue::<TestData, BsonSerializer>::new(&uri, queue_name)
.await
.expect("Failed to create MongoTaskQueue");
let db = get_mongodb().await;

let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(db, queue_name)
.await
.expect("Failed to create MongoTaskQueue");

let task = Task::new(TestData { value: 42 });
let original_id = task.task_id;
Expand All @@ -136,12 +138,10 @@ mod tests {
#[tokio::test]
async fn test_pop_status_handling() {
let queue_name = "test_pop_status";
let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(
&get_mongo_connection().await,
queue_name,
)
.await
.expect("Failed to create MongoTaskQueue");
let db = get_mongodb().await;
let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(db, queue_name)
.await
.expect("Failed to create MongoTaskQueue");

// Push a task
let task = Task::new(TestData { value: 42 });
Expand Down Expand Up @@ -177,12 +177,10 @@ mod tests {
#[tokio::test]
async fn test_push_pop_order() {
let queue_name = "test_push_pop_order";
let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(
&get_mongo_connection().await,
queue_name,
)
.await
.expect("Failed to create MongoTaskQueue");
let db = get_mongodb().await;
let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(db, queue_name)
.await
.expect("Failed to create MongoTaskQueue");

// Push multiple tasks in sequence
let tasks = vec![
Expand Down Expand Up @@ -215,12 +213,10 @@ mod tests {
#[tokio::test]
async fn test_ack() {
let queue_name = "test_ack";
let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(
&get_mongo_connection().await,
queue_name,
)
.await
.expect("Failed to create MongoTaskQueue");
let db = get_mongodb().await;
let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(db, queue_name)
.await
.expect("Failed to create MongoTaskQueue");

// Push and pop a task
let task = Task::new(TestData { value: 42 });
Expand Down Expand Up @@ -255,12 +251,10 @@ mod tests {
#[tokio::test]
async fn test_set() {
let queue_name = "test_set";
let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(
&get_mongo_connection().await,
queue_name,
)
.await
.expect("Failed to create MongoTaskQueue");
let db = get_mongodb().await;
let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(db, queue_name)
.await
.expect("Failed to create MongoTaskQueue");

// Create and push initial task
let mut task = Task::new(TestData { value: 42 });
Expand Down Expand Up @@ -341,12 +335,10 @@ mod tests {
#[tokio::test]
async fn test_nack() {
let queue_name = "test_nack";
let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(
&get_mongo_connection().await,
queue_name,
)
.await
.expect("Failed to create MongoTaskQueue");
let db = get_mongodb().await;
let queue = MongoTaskQueue::<TestData, BsonSerializer>::new(db, queue_name)
.await
.expect("Failed to create MongoTaskQueue");

// Push and pop a task
let task = Task::new(TestData { value: 42 });
Expand Down
1 change: 0 additions & 1 deletion capp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ rand = "0.8"
md5 = "0.7"
url = "2.5"
base64 = "0.22"
# tempfile = "3"

[features]
http = ["dep:reqwest", "capp-config/http"]
Expand Down

0 comments on commit f8f8112

Please sign in to comment.