From 2e07295c73a509488f7dcfc03d5ef2c3948450df Mon Sep 17 00:00:00 2001 From: epi Date: Mon, 27 Mar 2023 05:42:39 -0500 Subject: [PATCH 1/3] replaced async fuzzers mpsc with mpmc --- Cargo.toml | 3 +- Makefile.toml | 6 +- src/deciders/mod.rs | 12 +- src/fuzzers/async_fuzzer.rs | 262 +++++++++++++++++++++++++----------- 4 files changed, 197 insertions(+), 86 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 532df6f..227a2b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "feroxfuzz" -version = "1.0.0-rc.9" +version = "1.0.0-rc.10" edition = "2021" authors = ["Ben 'epi' Risher (@epi052)"] license = "Apache-2.0" @@ -65,6 +65,7 @@ lazy_static = { version = "1.4" } futures = { version = "0.3", optional = true } base64 = { version = "0.13.0", optional = true } hex = { version = "0.4.3", optional = true } +flume = { version = "0.10.14" } [dev-dependencies] http = { version = "0.2" } diff --git a/Makefile.toml b/Makefile.toml index c5fca21..f172df1 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -9,6 +9,10 @@ dependencies = ["upgrade-deps", "update"] [tasks.check] dependencies = ["clippy", "fmt", "test-lib", "test-doc", "doc"] +[tasks.test] +clear = true +dependencies = ["test-doc", "test-lib"] + # docs [tasks.doc] script = """ @@ -50,5 +54,5 @@ cargo test --all-features --doc "${@}" [tasks.test-lib] clear = true script = """ -cargo nextest run --all-features --lib "${@}" +cargo nextest run --all-features --retries 10 --lib "${@}" """ diff --git a/src/deciders/mod.rs b/src/deciders/mod.rs index 0e09b7b..22f6005 100644 --- a/src/deciders/mod.rs +++ b/src/deciders/mod.rs @@ -117,7 +117,7 @@ where // take the current action that was decided upon via decide_with_request, and the // previously decided action (if any) to arrive at what should be returned as the // current decided action - let final_action = match (action, new_action, operation) { + match (action, new_action, operation) { (None, None, _) => None, (None, Some(new_action), _) => Some(new_action), (Some(old_action), None, _) => Some(old_action), @@ -127,9 +127,7 @@ where (Some(old_action), Some(new_action), LogicOperation::Or) => { Some(old_action | new_action) } - }; - - final_action + } } /// called after an [`HttpClient`] receives a [`Response`] @@ -160,7 +158,7 @@ where // take the current action that was decided upon via decide_with_observers, and the // previously decided action (if any) to arrive at what should be returned as the // current decided action - let final_action = match (action, new_action, operation) { + match (action, new_action, operation) { (None, None, _) => None, (None, Some(new_action), _) => Some(new_action), (Some(old_action), None, _) => Some(old_action), @@ -170,9 +168,7 @@ where (Some(old_action), Some(new_action), LogicOperation::Or) => { Some(old_action | new_action) } - }; - - final_action + } } } diff --git a/src/fuzzers/async_fuzzer.rs b/src/fuzzers/async_fuzzer.rs index 116ddb8..a08f285 100644 --- a/src/fuzzers/async_fuzzer.rs +++ b/src/fuzzers/async_fuzzer.rs @@ -4,7 +4,6 @@ use std::marker::Send; use std::sync::Arc; use async_trait::async_trait; -use tokio::sync::mpsc::{self, UnboundedReceiver}; use tokio::sync::Semaphore; use tokio::task::JoinHandle; use tracing::{instrument, warn}; @@ -31,6 +30,27 @@ use crate::schedulers::Scheduler; use crate::state::SharedState; use crate::std_ext::ops::LogicOperation; +/// the number of post-processors (i.e. recv side of `flume::mpmc`) to handle +/// the post-send loop logic / execution +/// +/// 6 was chosen based on local testing and could be adjusted if needed +/// +/// note: if you change this value, you must also change the number of .pop +/// calls that we make on the post-processors vec in the call to +/// `tokio::join` later in this module +const NUM_POST_PROCESSORS: usize = 6; + +/// a crude way of passing information from the post-send loops +/// back up to the pre-send loop +/// +/// using return values isn't possible because the post-send loops +/// are spawned as background tasks. attempting to use `try_join` +/// doesn't work because the error that is returned is a +/// `tokio::task::JoinError`. We can't trigger an early return +/// from the post-send loop because we don't really own the +/// initial `Result` that is returned from the `tokio::spawn` call +static mut STOP_FUZZING_FLAG: bool = false; + /// internal type used to pass a single object from the `tokio::spawn` /// call to the `FuturesUnordered` stream #[derive(Debug, Clone)] @@ -198,29 +218,49 @@ where // tokio semaphore to limit the number of concurrent requests let semaphore = Arc::new(Semaphore::new(self.threads)); - // in order to process responses as they come in, we need to spawn a new task - // that will handle the responses via a mpsc channel. This means that we have + // in order to process responses as they come in, we need to spawn new tasks + // that will handle the responses via an mpmc channel. This means that we have // two loops going at any given time: one that sends requests/receives responses // and one that processes responses. In feroxfuzz terms, the first loop can be // thought of as the pre-send loop while the second loop acts as the post-send loop. - // create a channel to send requests to the async block - let (tx, rx) = mpsc::unbounded_channel(); - - // clone the deciders, observers, and processors so that they can be moved into the response - // processor's async block - let c_deciders = self.deciders.clone(); - let c_observers = self.observers.clone(); - let c_processors = self.processors.clone(); - let c_logic = self.post_send_logic; - let c_state = state.clone(); - - // kick off the response processing t - let response_processing_handle = tokio::spawn(async move { - process_responses(c_state, c_deciders, c_observers, c_processors, c_logic, rx) + // create an unbounded mpmc channel to send requests to the async block + let (tx, rx) = flume::unbounded(); + + // kick off the response processing threads + let mut post_processing_handles = Vec::with_capacity(NUM_POST_PROCESSORS); + + for _ in 0..NUM_POST_PROCESSORS { + // clone the deciders, observers, and processors so that they can be moved into the response + // processor's async block + let c_deciders = self.deciders.clone(); + let c_observers = self.observers.clone(); + let c_processors = self.processors.clone(); + let c_logic = self.post_send_logic; + let c_state = state.clone(); + let c_rx = rx.clone(); + + // each spawned post-processor uses the same mpmc recv channel to receive responses + // from the pre-send loop; changing from mpsc to mpmc dramatically sped up + // processing time since the pre-send loop could pretty easily overwhelm the + // post-send loop. as a result, overall scan time was dramatically reduced as well + // since we could get into situations where all requests/responses were complete + // but the single consumer was still processing responses + let handle = tokio::spawn(async move { + process_responses( + c_state, + c_deciders, + c_observers, + c_processors, + c_logic, + c_rx, + ) .await - .unwrap_or_default() - }); + .unwrap_or_default(); + }); + + post_processing_handles.push(handle); + } // first loop fires off requests and receives the responses // those responses are then sent to the second loop via the mpsc channel @@ -253,6 +293,13 @@ where } }; + if unsafe { STOP_FUZZING_FLAG } { + // if one of the post-processing tasks set the stop flag, we need to stop + // here as well. The check is placed here to catch any requests that were + // previously held by the semaphore but not yet sent + return Ok(Some(Action::StopFuzzing)); + } + let mut request = self.request.clone(); *request.id_mut() += self.request_id; @@ -373,7 +420,7 @@ where // spawn a new task to send the request, and when received, send the response // across the mpsc channel to the second/post-send loop - let sent = tx.send(Some(tokio::spawn(async move { + let sent = tx.send(tokio::spawn(async move { let cloned_request = mutated_request.clone(); // send the request, and await the response @@ -396,7 +443,7 @@ where // loop Err(err) => Err(err), } - }))); + })); // UnboundedChannel::send can only error if the receiver has called close() // on the channel, which we don't do, or the receiver has been dropped. @@ -425,26 +472,38 @@ where self.request_id += 1; } - // send a None to the receiver, to signal that we're done sending requests - if let Err(err) = tx.send(None) { - tracing::error!("Failed to tell response processing task to stop: {:?}", err); - } + // now that all requests have been spawned/sent, we can close the tx side of the + // connection. this will allow the receivers to complete when all of the requests + // have been processed + drop(tx); + + // the join! macro here is not driving the spawned tasks, rather it is waiting for + // the task handles to complete. This is the reason for the use of the + // STOP_FUZZING_FLAG, since we can't get returned error values early from the spawned + // tasks + let (first, second, third, fourth, fifth, sixth) = tokio::join!( + // note: these unwraps are ok, since the NUM_POST_PROCESSING_TASKS value is a const, without + // any possibility of user interaction. However, if that value changes, then the + // number of calls to .pop will need to change to reflect that + post_processing_handles.pop().unwrap(), + post_processing_handles.pop().unwrap(), + post_processing_handles.pop().unwrap(), + post_processing_handles.pop().unwrap(), + post_processing_handles.pop().unwrap(), + post_processing_handles.pop().unwrap(), + ); - // wait for all of the request futures to complete - let processed_result = match response_processing_handle.await { - Ok(result) => result, - Err(err) => { + // if any of the tasks failed, log the error and move along, nothing can really be + // done about it from here + [first, second, third, fourth, fifth, sixth] + .into_iter() + .filter_map(|result| match result { + Ok(_) => None, + Err(err) => Some(err), + }) + .for_each(|err| { tracing::error!("Failed to join response processing task: {:?}", err); - return Err(FeroxFuzzError::TaskJoinError { - message: err.to_string(), - }); - } - }; - - // if the response processing loop returned an action, return it - if let Some(action) = processed_result { - return Ok(Some(action)); - } + }); if let Some(hook) = &mut self.post_loop_hook { // call the post-loop hook if it is defined @@ -466,8 +525,8 @@ async fn process_responses( mut observers: O, mut processors: P, post_send_logic: LogicOperation, - mut receiver: UnboundedReceiver>>>, -) -> Result, FeroxFuzzError> + receiver: flume::Receiver>>, +) -> Result<(), FeroxFuzzError> where D: Deciders + Send + Clone, O: Observers + Send + Clone, @@ -478,22 +537,25 @@ where // outer loop awaits the actual response, which is a double-nested Result // Result, tokio::task::JoinError> tracing::debug!("entering the response processing loop..."); - while let Some(handle) = receiver.recv().await { - if handle.is_none() { - // if the handle is None, then the sender has closed the channel - // and we should exit the loop - break; + + while let Ok(handle) = receiver.recv_async().await { + if unsafe { STOP_FUZZING_FLAG } { + // if one task sets the stop fuzzing flag, all other tasks need to + // act on it as well, so we check for the flag at the top of the loop + // + // purposely placing this before the handle.await, so that in the event + // that the flag is set, while awaiting a given handle, we'll still + // process that last handle before exiting + return Ok(()); } - // safe to unwrap here, because we know the handle is Some - let handle = handle.unwrap().await; + let handle = handle.await; let Ok(task_result) = handle else { // tokio::task::JoinError -- task failed to execute to completion // could be a cancelled task, or one that panicked for w/e reason // // either way, we can't process the response, so we just continue - warn!("Task failed to execute to completion: {:?}", handle.err()); continue; }; @@ -553,8 +615,14 @@ where "[ID: {}] stopping fuzzing due to AddToCorpus[StopFuzzing] action", request_id ); + state.events().notify(&StopFuzzing); - return Ok(Some(Action::StopFuzzing)); + + unsafe { + STOP_FUZZING_FLAG = true; + } + + return Err(FeroxFuzzError::FuzzingStopped); } FlowControl::Discard => { state.events().notify(DiscardedResponse { id: request_id }); @@ -569,8 +637,14 @@ where "[ID: {}] stopping fuzzing due to StopFuzzing action", request_id ); + state.events().notify(&StopFuzzing); - return Ok(Some(Action::StopFuzzing)); + + unsafe { + STOP_FUZZING_FLAG = true; + } + + return Err(FeroxFuzzError::FuzzingStopped); } Some(Action::Discard) => { state.events().notify(DiscardedResponse { id: request_id }); @@ -582,7 +656,7 @@ where } } - Ok(None) + Ok(()) } #[cfg(test)] @@ -669,9 +743,10 @@ mod tests { Ok(()) } - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] /// test that the fuzz loop will stop if the decider returns a StopFuzzing action /// in the post-send phase + #[allow(clippy::too_many_lines)] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_async_fuzzer_stops_fuzzing_post_send() -> Result<(), Box> { let srv = MockServer::start(); @@ -720,48 +795,83 @@ mod tests { let deciders = build_deciders!(decider); let mutators = build_mutators!(mutator); - let mut fuzzer = AsyncFuzzer::new(5) - .client(client) - .request(request) - .scheduler(scheduler) - .mutators(mutators) - .observers(observers) - .deciders(deciders) + let mut fuzzer = AsyncFuzzer::new(1) + .client(client.clone()) + .request(request.clone()) + .scheduler(scheduler.clone()) + .mutators(mutators.clone()) + .observers(observers.clone()) + .deciders(deciders.clone()) .build(); fuzzer.fuzz_once(&mut state).await?; - // /0 sent/recv'd and ok - // /1 sent/recv'd and bad - // /2 never *processed* - // - // in an async context, this works ok by itself with a threadcount of 1, but the other request - // is still in-flight and will likely hit the target, this matters for the following test - // assertions as the expected count is more than what one may think is accurate if let Ok(guard) = state.stats().read() { assert!((guard.requests() - 2.0).abs() < std::f64::EPSILON); assert_eq!(guard.status_code_count(200).unwrap(), 1); assert_eq!(guard.status_code_count(201).unwrap(), 1); + assert_eq!( + guard + .actions() + .get("response") + .unwrap() + .get(&Action::StopFuzzing) + .unwrap(), + &1 + ); } - fuzzer.scheduler.reset(); + fuzzer = AsyncFuzzer::new(1) + .client(client.clone()) + .request(request.clone()) + .scheduler(scheduler.clone()) + .mutators(mutators.clone()) + .observers(observers.clone()) + .deciders(deciders.clone()) + .build(); + fuzzer.fuzz_n_iterations(2, &mut state).await?; - // at this point, /2 was hit from the previous test, so we're 1 higher than expected if let Ok(guard) = state.stats().read() { - assert!((guard.requests() - 4.0).abs() < std::f64::EPSILON); - assert_eq!(guard.status_code_count(200).unwrap(), 2); - assert_eq!(guard.status_code_count(201).unwrap(), 2); + assert!((guard.requests() - 2.0).abs() < std::f64::EPSILON); + assert_eq!(guard.status_code_count(200).unwrap(), 1); + assert_eq!(guard.status_code_count(201).unwrap(), 1); + assert_eq!( + guard + .actions() + .get("response") + .unwrap() + .get(&Action::StopFuzzing) + .unwrap(), + &1 + ); } - fuzzer.scheduler.reset(); + fuzzer = AsyncFuzzer::new(1) + .client(client) + .request(request) + .scheduler(scheduler) + .mutators(mutators) + .observers(observers) + .deciders(deciders) + .build(); + fuzzer.fuzz(&mut state).await?; // at this point, /2 was hit from both previous tests, so we're 2 higher than expected if let Ok(guard) = state.stats().read() { - assert!((guard.requests() - 6.0).abs() < std::f64::EPSILON); - assert_eq!(guard.status_code_count(200).unwrap(), 3); - assert_eq!(guard.status_code_count(201).unwrap(), 3); + assert!((guard.requests() - 2.0).abs() < std::f64::EPSILON); + assert_eq!(guard.status_code_count(200).unwrap(), 1); + assert_eq!(guard.status_code_count(201).unwrap(), 1); + assert_eq!( + guard + .actions() + .get("response") + .unwrap() + .get(&Action::StopFuzzing) + .unwrap(), + &1 + ); } // the take away is that the fuzz/fuzz_n_iterations methods stop when told to, even though From 8402d75fdf0423839267b177b63b9dabf853bab3 Mon Sep 17 00:00:00 2001 From: epi Date: Mon, 27 Mar 2023 05:55:05 -0500 Subject: [PATCH 2/3] upgraded libafl to 0.9.0 --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 227a2b3..a7e495a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,7 @@ tokio = { version = "1.20", optional = true, features = [ num = { version = "0.4" } cfg-if = { version = "1.0" } dyn-clone = { version = "1.0.9" } -libafl = { version = "0.8.2", default-features = false, features = ["std"] } +libafl = { version = "0.9.0", default-features = false, features = ["std"] } url = { version = "2.2", features = ["serde"] } ## optional serde = { version = "1.0", optional = true, features = ["derive", "rc"] } @@ -63,7 +63,7 @@ regex = { version = "1.6" } serde_regex = { version = "1.1.0" } lazy_static = { version = "1.4" } futures = { version = "0.3", optional = true } -base64 = { version = "0.13.0", optional = true } +base64 = { version = "0.13.1", optional = true } hex = { version = "0.4.3", optional = true } flume = { version = "0.10.14" } From 83b588bd0bf4a6df859846975c2154227deb5717 Mon Sep 17 00:00:00 2001 From: epi Date: Mon, 27 Mar 2023 06:03:56 -0500 Subject: [PATCH 3/3] clippy --- src/fuzzers/async_fuzzer.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/fuzzers/async_fuzzer.rs b/src/fuzzers/async_fuzzer.rs index a08f285..e777361 100644 --- a/src/fuzzers/async_fuzzer.rs +++ b/src/fuzzers/async_fuzzer.rs @@ -279,6 +279,10 @@ where // the semaphore only has self.threads permits, so this will block // until one is available, limiting the number of concurrent requests + // + // for the clippy allow: as far as I can tell, this is a false positive since + // we actually take ownership of the permit in the match arm + #[allow(clippy::significant_drop_in_scrutinee)] let permit = match semaphore.clone().acquire_owned().await { Ok(permit) => permit, Err(err) => {