Skip to content

Commit

Permalink
trying to crawl through bullshit
Browse files Browse the repository at this point in the history
  • Loading branch information
oiwn committed Dec 16, 2024
1 parent 33c019b commit 17a3c58
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 116 deletions.
15 changes: 12 additions & 3 deletions capp-queue/src/backend/mongodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ where
}
Ok(())
}

async fn nack(&self, task: &Task<D>) -> Result<(), TaskQueueError> {
let mut session = self.client.start_session().await?; // Convert to MongodbError

Expand All @@ -112,10 +113,18 @@ where
}

async fn set(&self, task: &Task<D>) -> Result<(), TaskQueueError> {
self.tasks_collection
.replace_one(doc! { "task_id": task.task_id.to_string() }, task)
.await?; // Convert to MongodbError
let result = self
.tasks_collection
.replace_one(
// Use toString() since that's how it's stored in MongoDB
doc! { "task_id": task.task_id.to_string() },
task,
)
.await?;

Check warning on line 123 in capp-queue/src/backend/mongodb.rs

View check run for this annotation

Codecov / codecov/patch

capp-queue/src/backend/mongodb.rs#L116-L123

Added lines #L116 - L123 were not covered by tests

if result.matched_count == 0 {
return Err(TaskQueueError::TaskNotFound(task.task_id));
}

Check warning on line 127 in capp-queue/src/backend/mongodb.rs

View check run for this annotation

Codecov / codecov/patch

capp-queue/src/backend/mongodb.rs#L125-L127

Added lines #L125 - L127 were not covered by tests
Ok(())
}
}
Expand Down
19 changes: 18 additions & 1 deletion capp-queue/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,23 @@ impl std::fmt::Display for TaskId {
}
}

/* #[cfg(feature = "mongodb")]
impl From<TaskId> for mongodb::bson::Binary {
fn from(id: TaskId) -> Self {
mongodb::bson::Binary {
subtype: mongodb::bson::spec::BinarySubtype::Uuid,
bytes: id.get().as_bytes().to_vec(),
}
}
}
#[cfg(feature = "mongodb")]
impl From<mongodb::bson::Binary> for TaskId {
fn from(binary: mongodb::bson::Binary) -> Self {
TaskId(uuid::Uuid::from_bytes(binary.bytes.try_into().unwrap()))
}
}
#[cfg(feature = "mongodb")]
impl From<TaskId> for mongodb::bson::Uuid {
fn from(id: TaskId) -> Self {
Expand All @@ -135,7 +152,7 @@ impl From<mongodb::bson::Uuid> for TaskId {
fn from(uuid: mongodb::bson::Uuid) -> Self {
TaskId(uuid::Uuid::from_bytes(uuid.bytes()))
}
}
} */

#[cfg(test)]
mod tests {
Expand Down
209 changes: 97 additions & 112 deletions capp-queue/tests/mongodb_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ mod tests {
if let Err(e) = cleanup_collections(name).await {
tracing::error!("Cleanup failed: {:?}", e);
}
// ensure cleanup is complete
tokio::time::sleep(Duration::from_millis(100)).await;
let uri = get_mongo_connection().await;
MongoTaskQueue::new(&uri, name)
Expand All @@ -81,7 +80,6 @@ mod tests {
let bson_doc = bson::to_document(&task).expect("Failed to convert to BSON");
let task_from_bson: Task<TestData> =
bson::from_document(bson_doc).expect("Failed to convert from BSON");

// Verify task_id survived the round trip
assert_eq!(task.task_id, task_from_bson.task_id);
}
Expand Down Expand Up @@ -158,37 +156,45 @@ mod tests {
.expect("Cleanup failed");
}

/* #[tokio::test]
async fn test_collection_setup_and_dlq() {
let queue_name = "test-setup-dlq";
#[tokio::test]
async fn test_push_and_verify_format() {
let queue_name = "test_push_format";
let queue = setup_queue(queue_name).await;

// Push two tasks
let task1 = Task::new(TestData { value: 1 });
let task2 = Task::new(TestData { value: 2 });
queue.push(&task1).await.expect("Failed to push task1");
queue.push(&task2).await.expect("Failed to push task2");
// Pop and nack one task to create DLQ
let popped_task = queue.pop().await.expect("Failed to pop task");
queue.nack(&popped_task).await.expect("Failed to nack task");
// Create and push a task
let task = Task::new(TestData { value: 42 });
let task_id = task.task_id;
queue.push(&task).await.expect("Failed to push task");

// Verify tasks collection exists and has one remaining task
let tasks_count = queue
// Get raw document and inspect the BSON format
let raw_doc = queue
.tasks_collection
.count_documents(doc! {})
.find_one(doc! {})
.await
.expect("Failed to count tasks");
assert_eq!(tasks_count, 1, "Should have one task remaining");
.expect("Failed to query")
.expect("Document should exist");

// Convert to BSON document to inspect actual format
let bson_doc = mongodb::bson::to_document(&raw_doc)
.expect("Failed to convert to BSON");
println!("Raw BSON document in MongoDB: {:#?}", bson_doc);

// Get the task_id field and print its specific type and value
if let Some(task_id_bson) = bson_doc.get("task_id") {
println!("task_id type: {:?}", task_id_bson.element_type());
println!("task_id value: {:?}", task_id_bson);
}

// Print the UUID we're searching with
println!("Original UUID bytes: {:?}", task_id.get().as_bytes());

// Verify DLQ collection exists and has one task
let dlq_count = queue
.dlq_collection
.count_documents(doc! {})
// Try finding with direct BSON query
let found = queue
.tasks_collection
.find_one(bson_doc)
.await
.expect("Failed to count DLQ");
assert_eq!(dlq_count, 1, "Should have one task in DLQ");
.expect("Query failed");
println!("Found with exact BSON document: {:?}", found.is_some());

// Cleanup
cleanup_collections(queue_name)
Expand All @@ -197,111 +203,90 @@ mod tests {
}

#[tokio::test]
async fn test_push() {
tracing_subscriber::fmt::init();
// Setup
let queue_name = "test-push";
async fn test_task_id_mongodb_format() {
let queue_name = "test-id-format";
let queue = setup_queue(queue_name).await;
let test_value = 42;
let task = Task::new(TestData { value: test_value });
let task_id = task.task_id;

// Push task
// Create and push a test task
let task = Task::new(TestData { value: 42 });
let task_id = task.task_id;
queue.push(&task).await.expect("Failed to push task");

// Debug: Print collection contents
let all_docs = queue
.tasks_collection
.find(doc! {})
.await
.expect("Failed to query collection")
.collect::<Vec<_>>()
.await;
tracing::info!("Collection contents: {:?}", all_docs);
// Verify task exists using find with no filter first
let result = queue
// Get the raw BSON document to examine how it's actually stored
let raw_doc = queue
.tasks_collection
.find_one(doc! {})
.await
.expect("Failed to query task");
assert!(result.is_some(), "Collection should not be empty");
// Now try to find specific task using proper BSON UUID
let result = queue
.expect("Failed to query")
.expect("Document should exist");

// Get the raw document as BSON to inspect it
let bson_doc = mongodb::bson::to_document(&raw_doc)
.expect("Failed to convert to BSON");
println!("Raw BSON document in MongoDB: {:#?}", bson_doc);

// Specifically examine the task_id field
let task_id_field =
bson_doc.get("task_id").expect("task_id field should exist");
println!("task_id type: {:?}", task_id_field.element_type());
println!("task_id value: {:#?}", task_id_field);
println!("Original UUID bytes: {:?}", task_id.get().as_bytes());

// Try to find document with exact BSON match
let found = queue
.tasks_collection
.find_one(doc! {
"task_id": mongodb::bson::Binary {
subtype: mongodb::bson::spec::BinarySubtype::Uuid,
bytes: task_id.get().as_bytes().to_vec(),
}
})
.find_one(doc! { "task_id": task_id.to_string() })
.await
.expect("Failed to query task");
assert!(result.is_some(), "Task should exist in collection");
// Verify task data
if let Some(stored_task) = result {
assert_eq!(
stored_task.payload.value, test_value,
"Task payload should match"
);
assert_eq!(stored_task.task_id, task_id, "Task ID should match");
}
.expect("Failed to query")
.is_some();
println!("Found with exact BSON document: {}", found);

// Cleanup
// Clean up
cleanup_collections(queue_name)
.await
.expect("Cleanup failed");
}

#[tokio::test]
async fn test_collection_setup() {
let queue_name = "test-setup";
let uri = get_mongo_connection().await;
let client_options = ClientOptions::parse(&uri)
.await
.expect("Failed to parse MongoDB options");
let db_name = client_options
.default_database
.as_ref()
.expect("No database specified in MongoDB URI")
.clone();
async fn test_inspect_task_storage() {
let queue_name = "test_inspect";
let queue = setup_queue(queue_name).await;

// Create queue
let _queue = setup_queue(queue_name).await;
// Create and push a single task
let task = Task::new(TestData { value: 42 });
let task_id = task.task_id;
queue.push(&task).await.expect("Failed to push task");

// Verify collections exist
let client = Client::with_options(client_options)
.expect("Failed to create MongoDB client");
// Get raw document and print its BSON format for inspection
let raw_doc = queue
.tasks_collection
.find_one(doc! {})
.await
.expect("Failed to query")
.expect("Document should exist");

// Convert to BSON document to inspect actual format
let bson_doc = mongodb::bson::to_document(&raw_doc)
.expect("Failed to convert to BSON");
println!("Raw BSON document in MongoDB: {:#?}", bson_doc);

// Get the task_id field and print its specific type and value
if let Some(task_id_bson) = bson_doc.get("task_id") {
println!("task_id type: {:?}", task_id_bson.element_type());
println!("task_id value: {:?}", task_id_bson);
}

assert!(
verify_collection_exists(
&client,
&db_name,
&format!("{}_tasks", queue_name)
)
.await,
"Tasks collection should exist"
);
assert!(
verify_collection_exists(
&client,
&db_name,
&format!("{}_dlq", queue_name)
)
.await,
"DLQ collection should exist"
);
// Print the UUID we're searching with
println!("Original UUID bytes: {:?}", task_id.get().as_bytes());

// Cleanup
cleanup_collections(queue_name)
// Try finding with direct BSON query
let found = queue
.tasks_collection
.find_one(bson_doc)
.await
.expect("Cleanup failed");
} */
.expect("Query failed");
println!("Found with exact BSON document: {:?}", found.is_some());

// No cleanup - leave the data in MongoDB for manual inspection
}
}

0 comments on commit 17a3c58

Please sign in to comment.