From 21b01cd936baae6b5dfa958460b98aa0c41c894c Mon Sep 17 00:00:00 2001 From: oiwn Date: Fri, 17 Jan 2025 17:34:05 +0700 Subject: [PATCH 1/3] fix import --- capp/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/capp/src/lib.rs b/capp/src/lib.rs index d2229aa..d281f32 100644 --- a/capp/src/lib.rs +++ b/capp/src/lib.rs @@ -44,6 +44,8 @@ //! - `task`: Definitions and utilities for working with tasks. pub mod manager; pub mod prelude; +#[cfg(feature = "http")] +pub use config::http; // re-export pub use async_trait; From 428fadd37fd42af176bc8b5634ecf663eac84221 Mon Sep 17 00:00:00 2001 From: oiwn Date: Tue, 4 Feb 2025 21:43:11 +0700 Subject: [PATCH 2/3] little tweaks here and there, no code touched --- .amc.toml | 2 +- notes.org | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.amc.toml b/.amc.toml index b3cb034..273be38 100644 --- a/.amc.toml +++ b/.amc.toml @@ -1,5 +1,5 @@ delimiter = "---" -extensions = [".rs", ".js", ".py", ".toml"] # Add any file extensions you want to process +extensions = [".rs", ".toml"] llm_prompt = """ This is a collection of source code files from a project. Each file is separated by '---' delimiters. The files include Git metadata showing their last modification details. diff --git a/notes.org b/notes.org index 424e40d..41295c1 100644 --- a/notes.org +++ b/notes.org @@ -1,6 +1,7 @@ #+title: Notes * Major Tasks +** consider to use https://crates.io/crates/config instead of current configuration? ** figure if it can get rid of returning yaml_value in favour of some generic configuration type ** DONE fix RoundRobin queue i think it's done From c1932e74a25e2abb026ca72f6dce5ac05985e7f9 Mon Sep 17 00:00:00 2001 From: oiwn Date: Tue, 4 Feb 2025 23:50:28 +0700 Subject: [PATCH 3/3] basic example now working --- Cargo.toml | 2 +- capp/Cargo.toml | 12 +++++- capp/src/manager/worker.rs | 42 +++++++------------ examples/basic.rs | 8 ++-- notes.org | 6 ++- review.md | 85 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 120 insertions(+), 35 deletions(-) create mode 100644 review.md diff --git a/Cargo.toml b/Cargo.toml index 183bf8a..3764531 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,4 +34,4 @@ tokio = { version = "1.42", features = ["full"] } # Utils async-trait = { version = "0.1" } uuid = { version = "1.11", features = ["v4", "serde"] } -rand = { version = "0.8" } +rand = { version = "0.8" } diff --git a/capp/Cargo.toml b/capp/Cargo.toml index 0629d9d..3d8a60a 100644 --- a/capp/Cargo.toml +++ b/capp/Cargo.toml @@ -56,7 +56,7 @@ http-body-util = "0.1" bytes = "1.6" pin-project-lite = "0.2" dotenvy = "0.15" -scraper = "0.21" +scraper = "0.22" rand = "0.8" md5 = "0.7" url = "2.5" @@ -70,3 +70,13 @@ healthcheck = ["dep:reqwest"] redis = ["capp-queue/redis", "dep:rustis"] mongodb = ["capp-queue/mongodb", "dep:mongodb"] postgres = ["capp-queue/postgres", "dep:sqlx"] + +[[example]] +name = "basic" +path = "../examples/basic.rs" + +# [[example]] +# name = "hackernews" +# path = "../examples/hackernews/main.rs" + + diff --git a/capp/src/manager/worker.rs b/capp/src/manager/worker.rs index 34072db..70fe87c 100644 --- a/capp/src/manager/worker.rs +++ b/capp/src/manager/worker.rs @@ -8,6 +8,7 @@ use tokio::sync::{ broadcast, mpsc::{self, error::TryRecvError}, }; +use tracing::{debug, error, info, info_span, warn}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct WorkerId(usize); @@ -82,11 +83,7 @@ where // Implement limiting amount of tasks per worker if let Some(limit) = self.options.task_limit { if self.stats.tasks_processed >= limit { - tracing::info!( - "[{}] task_limit reached: {}", - self.worker_id, - limit - ); + warn!("[{}] task_limit reached: {}", self.worker_id, limit); return Ok(false); } }; @@ -110,11 +107,9 @@ where task.set_succeed(); self.queue.set(&task).await.unwrap(); self.queue.ack(&task.task_id).await.unwrap(); - tracing::info!( + info!( "[{}] Task {} succeed: {:?}", - self.worker_id, - &task.task_id, - &task.payload + self.worker_id, &task.task_id, &task.payload ); // record stats on success @@ -125,22 +120,16 @@ where task.set_retry(&err.to_string()); if task.retries < self.options.max_retries { self.queue.push(&task).await.unwrap(); - tracing::error!( + error!( "[{}] Task {} failed, retrying ({}): {:?}", - self.worker_id, - &task.task_id, - &task.retries, - &err + self.worker_id, &task.task_id, &task.retries, &err ); } else { task.set_dlq("Max retries"); self.queue.nack(&task).await.unwrap(); - tracing::error!( + error!( "[{}] Task {} failed, max reties ({}): {:?}", - self.worker_id, - &task.task_id, - &task.retries, - &err + self.worker_id, &task.task_id, &task.retries, &err ); } @@ -150,7 +139,7 @@ where } } Err(TaskQueueError::QueueEmpty) => { - tracing::warn!("[{}] No tasks found, waiting...", self.worker_id); + warn!("[{}] No tasks found, waiting...", self.worker_id); // wait for a while till try to fetch task tokio::time::sleep(self.options.no_task_found_delay).await; } @@ -220,13 +209,13 @@ pub async fn worker_wrapper( 'worker: loop { tokio::select! { _ = terminate.recv() => { - tracing::info!("Terminating immediately"); + info!("Terminating immediately"); return; }, run_result = worker.run(), if !should_stop => { match commands.try_recv() { Ok(WorkerCommand::Shutdown) => { - tracing::error!("[{}] Shutdown received", worker_id); + error!("[{}] Shutdown received", worker_id); should_stop = true; } Err(TryRecvError::Disconnected) => break 'worker, @@ -245,15 +234,12 @@ pub async fn worker_wrapper( // If a stop command was received, finish any ongoing work and then exit. if should_stop { - tracing::info!( - "[{}] Completing current task before stopping.", - worker_id - ); + info!("[{}] Completing current task before stopping.", worker_id); break; } } - tracing::info!("completed"); + info!("completed"); } /// This wrapper used to create new Worker setup internal logging @@ -287,7 +273,7 @@ pub async fn worker_wrapper_old( let mut should_stop = false; // setup spans - let span = tracing::info_span!("worker", _id = %worker_id); + let span = info_span!("worker", _id = %worker_id); let _enter = span.enter(); 'worker: loop { diff --git a/examples/basic.rs b/examples/basic.rs index b90273e..2242df5 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -5,8 +5,9 @@ use capp::prelude::{ use capp::{ config::Configurable, manager::{WorkersManager, WorkersManagerOptionsBuilder}, - queue::{AbstractTaskQueue, InMemoryTaskQueue, TaskQueue}, - task::Task, + queue::{ + AbstractTaskQueue, InMemoryTaskQueue, JsonSerializer, Task, TaskQueue, + }, }; use serde::{Deserialize, Serialize}; use std::{path, sync::Arc}; @@ -76,7 +77,8 @@ impl Computation for DivisionComputation { /// total tasks = 9 /// number of failed tasks = 4 async fn make_storage() -> impl TaskQueue + Send + Sync { - let storage = InMemoryTaskQueue::new(); + let storage: InMemoryTaskQueue = + InMemoryTaskQueue::new(); for i in 1..=5 { let task: Task = Task::new(TaskData { diff --git a/notes.org b/notes.org index 41295c1..c0f4a4d 100644 --- a/notes.org +++ b/notes.org @@ -1,13 +1,15 @@ #+title: Notes * Major Tasks +** Shared State (to run like a server with REST?) +** tui interface ** consider to use https://crates.io/crates/config instead of current configuration? ** figure if it can get rid of returning yaml_value in favour of some generic configuration type ** DONE fix RoundRobin queue i think it's done ** Redis backend should return RedisError, now custom ones. -** TODO mongodb queue -** TODO postgres queue +** DONE mongodb queue +** DONE postgres queue ** TODO benchmarks criterion * Tasks diff --git a/review.md b/review.md new file mode 100644 index 0000000..d694372 --- /dev/null +++ b/review.md @@ -0,0 +1,85 @@ +# Code Review: CAPP (Comprehensive Asynchronous Parallel Processing) Framework + +## Overview +CAPP is a Rust framework for building distributed task processing systems, +with a particular focus on web crawlers. The codebase demonstrates strong Rust +practices and a well-thought-out architecture. + +## Architecture Analysis + +### Core Components + +1. **Task Queue System** + - Multiple backend implementations (Redis, MongoDB, Postgres, In-Memory) + - Generic task handling with serialization support + - Dead Letter Queue (DLQ) for failed tasks + - Round-robin task distribution capability + +2. **Worker Management** + - Concurrent worker execution with configurable limits + - Graceful shutdown handling + - Per-worker statistics tracking + - Task retry mechanism with configurable policies + +3. **Configuration System** + - YAML-based configuration + - Proxy support with round-robin and random selection + - Environment variable integration + - Flexible HTTP client configuration + +### Design Patterns + +1. **Builder Pattern** + - Effectively used for WorkerOptions and WorkersManagerOptions + - Clean configuration initialization + - Clear default values + +2. **Trait-based Abstraction** + - `TaskQueue` trait for storage backends + - `Computation` trait for task processing + - `TaskSerializer` for data serialization + +3. **Error Handling** + - Custom error types with thiserror + - Proper error propagation + - Contextual error messages + +## Strengths + +1. **Modularity** + - Clean separation between components + - Feature flags for optional components + - Well-defined interfaces + +2. **Concurrency Control** + - Proper use of tokio for async operations + - Thread-safe shared state handling + - Graceful shutdown mechanisms + +3. **Testing** + - Comprehensive test coverage + - Integration tests for each backend + - Mock implementations for testing + +## Areas for Improvement + +1. **Documentation** + - While generally good, some public APIs lack detailed examples + - More inline documentation for complex algorithms would be helpful + - Consider adding architecture diagrams + +2. **Error Handling Enhancements** + ```rust + // Current: + pub enum TaskQueueError { + QueueError(String), + SerdeError(String), + // ... + } + + // Suggestion: Add more context + pub enum TaskQueueError { + QueueError { message: String, context: String }, + SerdeError { message: String, data_type: String }, + // ... + }