Skip to content

Commit

Permalink
Merge pull request #58 from oiwn/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
oiwn authored Feb 4, 2025
2 parents fe54a1a + c1932e7 commit bce0020
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 36 deletions.
2 changes: 1 addition & 1 deletion .amc.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
delimiter = "---"
extensions = [".rs", ".js", ".py", ".toml"] # Add any file extensions you want to process
extensions = [".rs", ".toml"]
llm_prompt = """
This is a collection of source code files from a project. Each file is separated by '---' delimiters.
The files include Git metadata showing their last modification details.
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ tokio = { version = "1.42", features = ["full"] }
# Utils
async-trait = { version = "0.1" }
uuid = { version = "1.11", features = ["v4", "serde"] }
rand = { version = "0.8" }
rand = { version = "0.8" }
12 changes: 11 additions & 1 deletion capp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ http-body-util = "0.1"
bytes = "1.6"
pin-project-lite = "0.2"
dotenvy = "0.15"
scraper = "0.21"
scraper = "0.22"
rand = "0.8"
md5 = "0.7"
url = "2.5"
Expand All @@ -70,3 +70,13 @@ healthcheck = ["dep:reqwest"]
redis = ["capp-queue/redis", "dep:rustis"]
mongodb = ["capp-queue/mongodb", "dep:mongodb"]
postgres = ["capp-queue/postgres", "dep:sqlx"]

[[example]]
name = "basic"
path = "../examples/basic.rs"

# [[example]]
# name = "hackernews"
# path = "../examples/hackernews/main.rs"


2 changes: 2 additions & 0 deletions capp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
//! - `task`: Definitions and utilities for working with tasks.
pub mod manager;
pub mod prelude;
#[cfg(feature = "http")]
pub use config::http;

// re-export
pub use async_trait;
Expand Down
42 changes: 14 additions & 28 deletions capp/src/manager/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tokio::sync::{
broadcast,
mpsc::{self, error::TryRecvError},
};
use tracing::{debug, error, info, info_span, warn};

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct WorkerId(usize);
Expand Down Expand Up @@ -82,11 +83,7 @@ where
// Implement limiting amount of tasks per worker
if let Some(limit) = self.options.task_limit {
if self.stats.tasks_processed >= limit {
tracing::info!(
"[{}] task_limit reached: {}",
self.worker_id,
limit
);
warn!("[{}] task_limit reached: {}", self.worker_id, limit);
return Ok(false);
}
};
Expand All @@ -110,11 +107,9 @@ where
task.set_succeed();
self.queue.set(&task).await.unwrap();
self.queue.ack(&task.task_id).await.unwrap();
tracing::info!(
info!(
"[{}] Task {} succeed: {:?}",
self.worker_id,
&task.task_id,
&task.payload
self.worker_id, &task.task_id, &task.payload
);

// record stats on success
Expand All @@ -125,22 +120,16 @@ where
task.set_retry(&err.to_string());
if task.retries < self.options.max_retries {
self.queue.push(&task).await.unwrap();
tracing::error!(
error!(
"[{}] Task {} failed, retrying ({}): {:?}",
self.worker_id,
&task.task_id,
&task.retries,
&err
self.worker_id, &task.task_id, &task.retries, &err
);
} else {
task.set_dlq("Max retries");
self.queue.nack(&task).await.unwrap();
tracing::error!(
error!(
"[{}] Task {} failed, max reties ({}): {:?}",
self.worker_id,
&task.task_id,
&task.retries,
&err
self.worker_id, &task.task_id, &task.retries, &err
);
}

Expand All @@ -150,7 +139,7 @@ where
}
}
Err(TaskQueueError::QueueEmpty) => {
tracing::warn!("[{}] No tasks found, waiting...", self.worker_id);
warn!("[{}] No tasks found, waiting...", self.worker_id);
// wait for a while till try to fetch task
tokio::time::sleep(self.options.no_task_found_delay).await;
}
Expand Down Expand Up @@ -220,13 +209,13 @@ pub async fn worker_wrapper<Data, Comp, Ctx>(
'worker: loop {
tokio::select! {
_ = terminate.recv() => {
tracing::info!("Terminating immediately");
info!("Terminating immediately");
return;
},
run_result = worker.run(), if !should_stop => {
match commands.try_recv() {
Ok(WorkerCommand::Shutdown) => {
tracing::error!("[{}] Shutdown received", worker_id);
error!("[{}] Shutdown received", worker_id);
should_stop = true;
}
Err(TryRecvError::Disconnected) => break 'worker,
Expand All @@ -245,15 +234,12 @@ pub async fn worker_wrapper<Data, Comp, Ctx>(

// If a stop command was received, finish any ongoing work and then exit.
if should_stop {
tracing::info!(
"[{}] Completing current task before stopping.",
worker_id
);
info!("[{}] Completing current task before stopping.", worker_id);
break;
}
}

tracing::info!("completed");
info!("completed");
}

/// This wrapper used to create new Worker setup internal logging
Expand Down Expand Up @@ -287,7 +273,7 @@ pub async fn worker_wrapper_old<Data, Comp, Ctx>(
let mut should_stop = false;

// setup spans
let span = tracing::info_span!("worker", _id = %worker_id);
let span = info_span!("worker", _id = %worker_id);
let _enter = span.enter();

'worker: loop {
Expand Down
8 changes: 5 additions & 3 deletions examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use capp::prelude::{
use capp::{
config::Configurable,
manager::{WorkersManager, WorkersManagerOptionsBuilder},
queue::{AbstractTaskQueue, InMemoryTaskQueue, TaskQueue},
task::Task,
queue::{
AbstractTaskQueue, InMemoryTaskQueue, JsonSerializer, Task, TaskQueue,
},
};
use serde::{Deserialize, Serialize};
use std::{path, sync::Arc};
Expand Down Expand Up @@ -76,7 +77,8 @@ impl Computation<TaskData, Context> for DivisionComputation {
/// total tasks = 9
/// number of failed tasks = 4
async fn make_storage() -> impl TaskQueue<TaskData> + Send + Sync {
let storage = InMemoryTaskQueue::new();
let storage: InMemoryTaskQueue<TaskData, JsonSerializer> =
InMemoryTaskQueue::new();

for i in 1..=5 {
let task: Task<TaskData> = Task::new(TaskData {
Expand Down
7 changes: 5 additions & 2 deletions notes.org
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
#+title: Notes

* Major Tasks
** Shared State (to run like a server with REST?)
** tui interface
** consider to use https://crates.io/crates/config instead of current configuration?
** figure if it can get rid of returning yaml_value in favour of some generic configuration type
** DONE fix RoundRobin queue
i think it's done
** Redis backend should return RedisError, now custom ones.
** TODO mongodb queue
** TODO postgres queue
** DONE mongodb queue
** DONE postgres queue
** TODO benchmarks criterion

* Tasks
Expand Down
85 changes: 85 additions & 0 deletions review.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Code Review: CAPP (Comprehensive Asynchronous Parallel Processing) Framework

## Overview
CAPP is a Rust framework for building distributed task processing systems,
with a particular focus on web crawlers. The codebase demonstrates strong Rust
practices and a well-thought-out architecture.

## Architecture Analysis

### Core Components

1. **Task Queue System**
- Multiple backend implementations (Redis, MongoDB, Postgres, In-Memory)
- Generic task handling with serialization support
- Dead Letter Queue (DLQ) for failed tasks
- Round-robin task distribution capability

2. **Worker Management**
- Concurrent worker execution with configurable limits
- Graceful shutdown handling
- Per-worker statistics tracking
- Task retry mechanism with configurable policies

3. **Configuration System**
- YAML-based configuration
- Proxy support with round-robin and random selection
- Environment variable integration
- Flexible HTTP client configuration

### Design Patterns

1. **Builder Pattern**
- Effectively used for WorkerOptions and WorkersManagerOptions
- Clean configuration initialization
- Clear default values

2. **Trait-based Abstraction**
- `TaskQueue` trait for storage backends
- `Computation` trait for task processing
- `TaskSerializer` for data serialization

3. **Error Handling**
- Custom error types with thiserror
- Proper error propagation
- Contextual error messages

## Strengths

1. **Modularity**
- Clean separation between components
- Feature flags for optional components
- Well-defined interfaces

2. **Concurrency Control**
- Proper use of tokio for async operations
- Thread-safe shared state handling
- Graceful shutdown mechanisms

3. **Testing**
- Comprehensive test coverage
- Integration tests for each backend
- Mock implementations for testing

## Areas for Improvement

1. **Documentation**
- While generally good, some public APIs lack detailed examples
- More inline documentation for complex algorithms would be helpful
- Consider adding architecture diagrams

2. **Error Handling Enhancements**
```rust
// Current:
pub enum TaskQueueError {
QueueError(String),
SerdeError(String),
// ...
}

// Suggestion: Add more context
pub enum TaskQueueError {
QueueError { message: String, context: String },
SerdeError { message: String, data_type: String },
// ...
}

0 comments on commit bce0020

Please sign in to comment.