Skip to content

Commit

Permalink
Merge pull request #44 from oiwn/dev
Browse files Browse the repository at this point in the history
adding mongodb backend
  • Loading branch information
oiwn authored Dec 13, 2024
2 parents 8b0e5b1 + 981a588 commit a135edd
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 0 deletions.
2 changes: 2 additions & 0 deletions capp-queue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ serde_json = { workspace = true }
async-trait = { workspace = true }
uuid = { workspace = true }
rustis = { version = "0.13", features = ["tokio-runtime"], optional = true }
# mongodb = { version = "3", features = ["tokio-runtime"], optional = true }

[dev-dependencies]
dotenvy = "0.15"

[features]
redis = ["dep:tokio", "dep:rustis"]
# mongodb = ["dep:tokio", "dep:mongodb"]
4 changes: 4 additions & 0 deletions capp-queue/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ pub mod memory;
pub mod redis;
#[cfg(feature = "redis")]
pub mod redis_rr;
#[cfg(features = "mongodb")
pub mod mongodb;

pub use memory::InMemoryTaskQueue;
#[cfg(feature = "redis")]
pub use redis::RedisTaskQueue;
#[cfg(feature = "redis")]
pub use redis_rr::RedisRoundRobinTaskQueue;
#[cfg(features = "mongodb")
pub use monogodb::MongoTaskQueue;
161 changes: 161 additions & 0 deletions capp-queue/src/backend/mongodb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
use async_trait::async_trait;
use mongodb::{
bson::{doc, Document},
options::{ClientOptions, FindOneAndDeleteOptions},
Client, Collection,
};
use serde::{de::DeserializeOwned, Serialize};
use std::marker::PhantomData;

use crate::queue::{TaskQueue, TaskQueueError};
use crate::task::{Task, TaskId};

pub struct MongoTaskQueue<D> {
pub client: Client,
pub tasks_collection: Collection<Document>,
pub dlq_collection: Collection<Document>,
_marker: PhantomData<D>,
}

impl<D> MongoTaskQueue<D> {
pub async fn new(
connection_string: &str,
queue_name: &str,
) -> Result<Self, TaskQueueError> {
let client_options = ClientOptions::parse(connection_string)
.await
.map_err(|e| TaskQueueError::QueueError(e.to_string()))?;

let client = Client::with_options(client_options)
.map_err(|e| TaskQueueError::QueueError(e.to_string()))?;

let db = client.database("task_queue");
let tasks_collection = db.collection(&format!("{}_tasks", queue_name));
let dlq_collection = db.collection(&format!("{}_dlq", queue_name));

// Create indexes
tasks_collection
.create_index(doc! { "task_id": 1 }, None)
.await
.map_err(|e| TaskQueueError::QueueError(e.to_string()))?;

Ok(Self {
client,
tasks_collection,
dlq_collection,
_marker: PhantomData,
})
}
}

#[async_trait]
impl<D> TaskQueue<D> for MongoTaskQueue<D>
where
D: std::fmt::Debug
+ Clone
+ Serialize
+ DeserializeOwned
+ Send
+ Sync
+ 'static,
{
async fn push(&self, task: &Task<D>) -> Result<(), TaskQueueError> {
let task_doc = mongodb::bson::to_document(&task)
.map_err(|e| TaskQueueError::SerdeError(e.to_string()))?;

self.tasks_collection
.insert_one(task_doc, None)
.await
.map_err(|e| TaskQueueError::QueueError(e.to_string()))?;

Ok(())
}

async fn pop(&self) -> Result<Task<D>, TaskQueueError> {
let options = FindOneAndDeleteOptions::default();

let result = self
.tasks_collection
.find_one_and_delete(doc! {}, options)
.await
.map_err(|e| TaskQueueError::QueueError(e.to_string()))?;

match result {
Some(doc) => {
let task: Task<D> = mongodb::bson::from_document(doc)
.map_err(|e| TaskQueueError::SerdeError(e.to_string()))?;
Ok(task)
}
None => Err(TaskQueueError::QueueEmpty),
}
}

async fn ack(&self, task_id: &TaskId) -> Result<(), TaskQueueError> {
let result = self
.tasks_collection
.delete_one(doc! { "task_id": task_id.to_string() }, None)
.await
.map_err(|e| TaskQueueError::QueueError(e.to_string()))?;

if result.deleted_count == 0 {
return Err(TaskQueueError::TaskNotFound(*task_id));
}

Ok(())
}

async fn nack(&self, task: &Task<D>) -> Result<(), TaskQueueError> {
let task_doc = mongodb::bson::to_document(&task)
.map_err(|e| TaskQueueError::SerdeError(e.to_string()))?;

// Start session for transaction
let mut session = self
.client
.start_session(None)
.await
.map_err(|e| TaskQueueError::QueueError(e.to_string()))?;

session
.start_transaction(None)
.await
.map_err(|e| TaskQueueError::QueueError(e.to_string()))?;

// Move to DLQ and remove from main queue
self.dlq_collection
.insert_one_with_session(task_doc, None, &mut session)
.await
.map_err(|e| TaskQueueError::QueueError(e.to_string()))?;

self.tasks_collection
.delete_one_with_session(
doc! { "task_id": task.task_id.to_string() },
None,
&mut session,
)
.await
.map_err(|e| TaskQueueError::QueueError(e.to_string()))?;

session
.commit_transaction()
.await
.map_err(|e| TaskQueueError::QueueError(e.to_string()))?;

Ok(())
}

async fn set(&self, task: &Task<D>) -> Result<(), TaskQueueError> {
let task_doc = mongodb::bson::to_document(&task)
.map_err(|e| TaskQueueError::SerdeError(e.to_string()))?;

self.tasks_collection
.replace_one(
doc! { "task_id": task.task_id.to_string() },
task_doc,
None,
)
.await
.map_err(|e| TaskQueueError::QueueError(e.to_string()))?;

Ok(())
}
}
2 changes: 2 additions & 0 deletions capp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ derive_builder = { version = "0.20" }
reqwest = { version = "0.12", features = ["gzip", "rustls-tls", "json"], optional = true }
serde_yaml = "0.9"
rustis = { version = "0.13", features = ["tokio-runtime"], optional = true }
# mongodb = { version = "3", features = ["tokio-runtime"], optional = true }
tracing-futures = "0.2"

[dev-dependencies]
Expand All @@ -66,3 +67,4 @@ http = ["dep:reqwest", "capp-config/http"]
router = ["capp-config/router"]
healthcheck = ["dep:reqwest"]
redis = ["dep:rustis", "capp-queue/redis"]
# mongodb = ["dep:mongodb", "capp-queue/mongodb"]

0 comments on commit a135edd

Please sign in to comment.