Skip to content

Commit

Permalink
Merge pull request #32 from oiwn/new
Browse files Browse the repository at this point in the history
New version, broke previous one.
  • Loading branch information
oiwn authored Oct 14, 2024
2 parents 1e490ae + f5dc799 commit f48d09a
Show file tree
Hide file tree
Showing 32 changed files with 1,955 additions and 977 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ Cargo.lock
.env
/tmp
tags

.DS_Store
7 changes: 5 additions & 2 deletions .tmuxp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,8 @@ windows:
- window_name: notes
panes:
- shell_command:
- clear
# - emacs -nw notes.org
- emacs -nw notes.org
- window_name: redis
panes:
- shell_command:
- just connect-redis
32 changes: 21 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
[package]
name = "capp"
version = "0.3.5"
version = "0.4.1"
edition = "2021"
license = "MIT"
authors = ["oiwn"]
description = "Common things i use to build Rust CLI tools for web crawlers."
homepage = "https://github.com/oiwn/capp-rs"
repository = "https://github.com/oiwn/capp-rs"
readme = "README.md"
keywords = ["mini-celery", "async", "executor"]
keywords = ["web-crawler", "async", "executor"]
categories = ["asynchronous", "web-programming", "concurrency"]
exclude = [
"tmp/*",
Expand All @@ -20,25 +20,35 @@ exclude = [
"notes.org"
]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
name = "capp"
path = "src/lib.rs"
doctest = false

[[bin]]
name = "capp"
path = "src/main.rs"


[dependencies]
async-trait = { version = "0.1" }
backoff = { version = "0.4", optional = true, features = ["tokio"] }
derive_builder = { version = "0.20" }
reqwest = { version = "0.12", features = ["gzip", "rustls-tls"], optional = true }
reqwest = { version = "0.12", features = ["gzip", "rustls-tls", "json"], optional = true }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
serde_yaml = "0.9"
thiserror = { version = "1.0" }
tokio = { version = "1.39", features = ["full"] }
thiserror = { version = "1" }
tokio = { version = "1.40", features = ["full"] }
uuid = { version = "1.10", features = ["v4", "serde"] }
rustis = { version = "0.13", features = ["tokio-runtime"], optional = true }
once_cell = "1.19"
tracing = "0.1"
tracing-subscriber = "0.3"
anyhow = "1.0"
anyhow = "1"
tracing-futures = "0.2"
indexmap = "2.6"
url = "2.5"
regex = "1.11"

[dev-dependencies]
capp = { path = ".", features = ["http", "healthcheck", "redis"] }
Expand All @@ -49,9 +59,9 @@ pin-project-lite = "0.2"
dotenvy = "0.15"
scraper = "0.19"
rand = "0.8"
md5 = "0.7.0"
url = "2.5.0"
base64 = "0.22.1"
md5 = "0.7"
url = "2.5"
base64 = "0.22"

[features]
http = ["dep:backoff", "dep:reqwest"]
Expand Down
10 changes: 10 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Common cli tasks

tags:
ctags -R --exclude=*/*.json --exclude=target/* .

lines:
pygount --format=summary --folders-to-skip=target,data,__pycache__,.git --names-to-skip=tags,*.html

connect-redis:
docker exec -it redis-stack redis-cli --askpass
6 changes: 0 additions & 6 deletions Makefile

This file was deleted.

37 changes: 17 additions & 20 deletions examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use capp::prelude::{
use capp::{
config::Configurable,
manager::{WorkersManager, WorkersManagerOptionsBuilder},
storage::{InMemoryTaskStorage, Task, TaskStorage},
queue::{AbstractTaskQueue, InMemoryTaskQueue, TaskQueue},
task::Task,
};
use serde::{Deserialize, Serialize};
use std::{path, sync::Arc};
Expand All @@ -22,7 +23,6 @@ pub struct DivisionComputation;

#[derive(Debug)]
pub struct Context {
name: String,
config: serde_yaml::Value,
}

Expand All @@ -36,7 +36,6 @@ impl Context {
fn from_config(config_file_path: impl AsRef<path::Path>) -> Self {
let config = Self::load_config(config_file_path);
Self {
name: "test-app".to_string(),
config: config.unwrap(),
}
}
Expand All @@ -47,28 +46,26 @@ impl Computation<TaskData, Context> for DivisionComputation {
/// TaskRunner will fail tasks which value can't be divided by 3
async fn call(
&self,
_worker_id: WorkerId,
ctx: Arc<Context>,
_storage: Arc<dyn TaskStorage<TaskData> + Send + Sync>,
worker_id: WorkerId,
_ctx: Arc<Context>,
_queue: AbstractTaskQueue<TaskData>,
task: &mut Task<TaskData>,
) -> Result<(), ComputationError> {
// setup spans
// let span = tracing::info_span!("computation", worker_id = %worker_id);
// let _enter = span.enter();

tracing::info!("Task received to process: {:?}", task.get_payload());
tracing::info!(
"[{}] Test division task: {:?}",
worker_id,
task.get_payload()
);

let rem = task.payload.value % 3;
if rem != 0 {
let err_msg = format!("Can't divide {} by 3", task.payload.value);
let err_msg =
format!("[{}] Can't divide {} by 3", worker_id, task.payload.value);
tokio::time::sleep(tokio::time::Duration::from_secs(rem as u64)).await;
return Err(ComputationError::Function(err_msg));
};

task.payload.finished = true;
if ctx.name == "test-app".to_string() {
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
}
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
Ok(())
}
Expand All @@ -78,16 +75,16 @@ impl Computation<TaskData, Context> for DivisionComputation {
/// For current set following conditions should be true:
/// total tasks = 9
/// number of failed tasks = 4
async fn make_storage() -> impl TaskStorage<TaskData> + Send + Sync {
let storage = InMemoryTaskStorage::new();
async fn make_storage() -> impl TaskQueue<TaskData> + Send + Sync {
let storage = InMemoryTaskQueue::new();

for i in 1..=5 {
let task: Task<TaskData> = Task::new(TaskData {
domain: "one".to_string(),
value: i,
finished: false,
});
let _ = storage.task_push(&task).await;
let _ = storage.push(&task).await;
}

for i in 1..=5 {
Expand All @@ -96,7 +93,7 @@ async fn make_storage() -> impl TaskStorage<TaskData> + Send + Sync {
value: i * 3,
finished: false,
});
let _ = storage.task_push(&task).await;
let _ = storage.push(&task).await;
}

for _ in 1..=10 {
Expand All @@ -105,7 +102,7 @@ async fn make_storage() -> impl TaskStorage<TaskData> + Send + Sync {
value: 2,
finished: false,
});
let _ = storage.task_push(&task).await;
let _ = storage.push(&task).await;
}
storage
}
Expand Down
File renamed without changes.
24 changes: 12 additions & 12 deletions examples/hackernews/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
use async_trait::async_trait;
use base64::{engine::general_purpose::URL_SAFE, Engine as _};
use capp::prelude::{
Computation, ComputationError, InMemoryTaskStorage, Task, TaskStorage,
WorkerId, WorkerOptionsBuilder, WorkersManager, WorkersManagerOptionsBuilder,
Computation, ComputationError, InMemoryTaskQueue, Task, TaskQueue, WorkerId,
WorkerOptionsBuilder, WorkersManager, WorkersManagerOptionsBuilder,
};
use capp::{config::Configurable, http, reqwest};
use capp::{tracing, tracing_subscriber};
use once_cell::sync::Lazy;
use rand::{seq::SliceRandom, thread_rng};
use scraper::{Html, Selector};
use serde::{Deserialize, Serialize};
use std::io::Write;
use std::sync::LazyLock;
use std::{
collections::HashSet,
path,
Expand All @@ -21,8 +21,8 @@ use url::{ParseError, Url};

const SEED_URLS: [&str; 1] = ["https://news.ycombinator.com"];

static URL_SET: Lazy<Arc<Mutex<HashSet<String>>>> = Lazy::new(|| {
let mut set: HashSet<String> = HashSet::new();
static URL_SET: LazyLock<Mutex<HashSet<String>>> = LazyLock::new(|| {
let mut set = HashSet::new();
// Add some urls we do not want to add into queue
set.insert("https://news.ycombinator.com/submit".into());
set.insert("https://news.ycombinator.com/jobs".into());
Expand All @@ -31,7 +31,7 @@ static URL_SET: Lazy<Arc<Mutex<HashSet<String>>>> = Lazy::new(|| {
set.insert("https://news.ycombinator.com/newcomments".into());
set.insert("https://news.ycombinator.com/front".into());
set.insert("https://news.ycombinator.com/newest".into());
Arc::new(Mutex::new(set))
Mutex::new(set)
});

#[derive(Debug)]
Expand Down Expand Up @@ -99,7 +99,7 @@ impl Computation<SiteLink, Context> for HNCrawler {
&self,
worker_id: WorkerId,
ctx: Arc<Context>,
storage: Arc<dyn TaskStorage<SiteLink> + Send + Sync + 'static>,
storage: Arc<dyn TaskQueue<SiteLink> + Send + Sync + 'static>,
task: &mut Task<SiteLink>,
) -> Result<(), ComputationError> {
tracing::info!("[worker-{}] Processing task: {:?}", worker_id, task);
Expand Down Expand Up @@ -189,13 +189,13 @@ impl HNCrawler {
// Store links to website for further crawling
async fn store_links_website(
links: Vec<Url>,
storage: Arc<dyn TaskStorage<SiteLink> + Send + Sync>,
storage: Arc<dyn TaskQueue<SiteLink> + Send + Sync>,
) -> Result<usize, anyhow::Error> {
let mut links_stored = 0;
tracing::info!("Adding {} links to the queue...", links.len());

for link in links.iter() {
let link_str = link.as_str().to_string();
let link_str = link.as_str().to_owned();

let should_store = {
// Scoped lock acquisition
Expand All @@ -205,7 +205,7 @@ impl HNCrawler {

if should_store {
let link_data = SiteLink { url: link_str };
storage.task_push(&Task::new(link_data)).await?;
storage.push(&Task::new(link_data)).await?;
links_stored += 1;
}
}
Expand Down Expand Up @@ -353,7 +353,7 @@ async fn main() {
.build()
.unwrap();

let storage: InMemoryTaskStorage<SiteLink> = InMemoryTaskStorage::new();
let storage: InMemoryTaskQueue<SiteLink> = InMemoryTaskQueue::new();
let tasks_queue_len = storage.list.lock().unwrap().len();

tracing::info!("Website links tasks in queue: {}", tasks_queue_len);
Expand All @@ -362,7 +362,7 @@ async fn main() {
tracing::warn!("Queue is empty! Seeding urls... {}", SEED_URLS.join(" "));
for url in SEED_URLS.iter() {
let initial_task = Task::new(SiteLink::new(url));
let _ = storage.task_push(&initial_task).await;
let _ = storage.push(&initial_task).await;
}
}

Expand Down
32 changes: 32 additions & 0 deletions src/healthcheck.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use reqwest::{Client, StatusCode};
use serde_json::Value;
use tokio::time::{timeout, Duration};

// const GOOGLE: &str = "http://www.google.com";
Expand Down Expand Up @@ -37,3 +38,34 @@ pub async fn internet(http_url: &str) -> bool {
);
false
}

pub async fn test_proxy(proxy_url: &str) -> bool {
let client = Client::new();
let proxy_client = Client::builder()
.proxy(reqwest::Proxy::all(proxy_url).unwrap())
.build()
.unwrap();

let ip_check_url = "https://httpbin.org/ip";

// Get local IP
let local_ip = match get_ip(&client, ip_check_url).await {
Ok(ip) => ip,
Err(_) => return false,
};

// Get IP through proxy
let proxy_ip = match get_ip(&proxy_client, ip_check_url).await {
Ok(ip) => ip,
Err(_) => return false,
};

// Compare IPs
local_ip != proxy_ip
}

async fn get_ip(client: &Client, url: &str) -> Result<String, reqwest::Error> {
let response = client.get(url).send().await?;
let body: Value = response.json().await?;
Ok(body["origin"].as_str().unwrap_or("").to_string())
}
7 changes: 6 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ pub mod healthcheck;
pub mod http;
pub mod manager;
pub mod prelude;
pub mod storage;
pub mod queue;
pub mod router;
pub mod task;
// #[cfg(test)]
// mod support;
// #[cfg(test)]
Expand All @@ -16,6 +18,8 @@ pub use async_trait;
#[cfg(feature = "http")]
pub use backoff;
pub use derive_builder;
pub use indexmap;
pub use regex;
#[cfg(feature = "http")]
pub use reqwest;
#[cfg(feature = "redis")]
Expand All @@ -26,4 +30,5 @@ pub use serde_yaml;
pub use thiserror;
pub use tracing;
pub use tracing_subscriber;
pub use url;
pub use uuid;
Loading

0 comments on commit f48d09a

Please sign in to comment.