Skip to content

Commit

Permalink
add tests for ack and nack
Browse files Browse the repository at this point in the history
  • Loading branch information
oiwn committed Dec 27, 2024
1 parent 94d5873 commit 3f57ded
Showing 1 changed file with 117 additions and 1 deletion.
118 changes: 117 additions & 1 deletion capp-queue/tests/postgres_tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[cfg(all(test, unix, feature = "postgres"))]
mod tests {
use capp_queue::{
JsonSerializer, PostgresTaskQueue, Task, TaskQueue, TaskQueueError,
JsonSerializer, PostgresTaskQueue, Task, TaskId, TaskQueue, TaskQueueError,
};
use serde::{Deserialize, Serialize};
use serial_test::serial;
Expand Down Expand Up @@ -170,6 +170,122 @@ mod tests {
}
}

#[tokio::test]
#[serial]
async fn test_ack_successful() {
let queue = PostgresTaskQueue::<TestData, JsonSerializer>::new(
&get_db_connection().await,
)
.await
.expect("Failed to create PostgresTaskQueue");

// Clean up before test
cleanup_database(&queue.pool)
.await
.expect("Failed to clean database");

// Create and push a task
let task = Task::new(TestData { value: 42 });
let task_id = task.task_id;
queue.push(&task).await.expect("Failed to push task");

// Ack the task
queue.ack(&task_id).await.expect("Failed to ack task");

// Verify task was removed
let result = sqlx::query!(
"SELECT COUNT(*) as count FROM tasks WHERE id = $1",
task_id.get(),
)
.fetch_one(&queue.pool)
.await
.expect("Failed to query tasks");

assert_eq!(result.count.unwrap(), 0, "Task should be removed after ack");
}

#[tokio::test]
#[serial]
async fn test_ack_nonexistent() {
let queue = PostgresTaskQueue::<TestData, JsonSerializer>::new(
&get_db_connection().await,
)
.await
.expect("Failed to create PostgresTaskQueue");

// Clean up before test
cleanup_database(&queue.pool)
.await
.expect("Failed to clean database");

// Try to ack non-existent task
let non_existent_id = TaskId::new();
match queue.ack(&non_existent_id).await {
Err(TaskQueueError::TaskNotFound(_)) => {}
other => panic!("Expected TaskNotFound error, got {:?}", other),
}
}

#[tokio::test]
#[serial]
async fn test_nack_with_dlq() {
let queue = PostgresTaskQueue::<TestData, JsonSerializer>::new(
&get_db_connection().await,
)
.await
.expect("Failed to create PostgresTaskQueue");

// Clean up before test
cleanup_database(&queue.pool)
.await
.expect("Failed to clean database");

// Create and push a task
let mut task = Task::new(TestData { value: 42 });
queue.push(&task).await.expect("Failed to push task");

// Set error message
task.set_dlq("Test error message");

// Nack the task
queue.nack(&task).await.expect("Failed to nack task");

// Verify task was moved to DLQ
let row = sqlx::query!(
r#"
SELECT id, payload, error_msg
FROM dlq
WHERE id = $1
"#,
task.task_id.get(),
)
.fetch_one(&queue.pool)
.await
.expect("Failed to query DLQ");

assert_eq!(row.id, task.task_id.get(), "Task ID should match");
assert_eq!(
row.error_msg.unwrap(),
"Test error message",
"Error message should match"
);

// Verify task was removed from main queue
let result = sqlx::query!(
"SELECT COUNT(*) as count FROM tasks WHERE id = $1",
task.task_id.get(),
)
.fetch_one(&queue.pool)
.await
.expect("Failed to query tasks");

assert_eq!(
result.count.unwrap(),
0,
"Task should be removed from main queue"
);
}

#[tokio::test]
#[serial]
async fn test_set() {
Expand Down

0 comments on commit 3f57ded

Please sign in to comment.