From 48d65be20659360181b24b83d8f1915c5da8229b Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Thu, 24 Oct 2024 11:38:24 -0500 Subject: [PATCH 1/2] Replace the current flow processing setup with a crossbeam-queue SeqQueue setup to better reuse memory and avoid thrashing. --- src/rust/Cargo.lock | 15 ++++++++++++--- src/rust/Cargo.toml | 2 ++ src/rust/lqosd/Cargo.toml | 3 ++- .../flow_data/flow_analysis/kernel_ringbuffer.rs | 15 +++++++++------ 4 files changed, 25 insertions(+), 10 deletions(-) diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index faf62304..07d0776e 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -718,6 +718,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -834,12 +843,11 @@ dependencies = [ [[package]] name = "dashmap" -version = "6.1.0" +version = "5.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "crossbeam-utils", "hashbrown", "lock_api", "once_cell", @@ -1859,6 +1867,7 @@ dependencies = [ "axum-extra", "bincode", "crossbeam-channel", + "crossbeam-queue", "csv", "dashmap", "default-net", diff --git a/src/rust/Cargo.toml b/src/rust/Cargo.toml index abbf62a3..35c19420 100644 --- a/src/rust/Cargo.toml +++ b/src/rust/Cargo.toml @@ -71,6 +71,8 @@ num-traits = "0.2.19" clap = { version = "4", features = ["derive"] } timerfd = "1.6" crossbeam-channel = { version = "0.5" } +crossbeam-queue = "0.3.11" +arc-swap = "1.7.1" # May have to change this one for ARM? jemallocator = "0.5" diff --git a/src/rust/lqosd/Cargo.toml b/src/rust/lqosd/Cargo.toml index 6ff0b7ea..04f4ce2f 100644 --- a/src/rust/lqosd/Cargo.toml +++ b/src/rust/lqosd/Cargo.toml @@ -47,7 +47,8 @@ rand = "0.8.5" mime_guess = "2.0.4" timerfd = { workspace = true } crossbeam-channel = { workspace = true } -arc-swap = "1.7.1" +arc-swap = { workspace = true } +crossbeam-queue = { workspace = true } # Support JemAlloc on supported platforms [target.'cfg(any(target_arch = "x86", target_arch = "x86_64"))'.dependencies] diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs index c1d62a95..4eba944b 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs @@ -128,8 +128,9 @@ pub struct FlowActor {} const EVENT_SIZE: usize = size_of::(); -static FLOW_BYTES_SENDER: OnceLock>> = OnceLock::new(); +static FLOW_BYTES_SENDER: OnceLock> = OnceLock::new(); static FLOW_COMMAND_SENDER: OnceLock> = OnceLock::new(); +static FLOW_BYTES: crossbeam_queue::SegQueue<[u8; EVENT_SIZE]> = crossbeam_queue::SegQueue::new(); #[derive(Debug)] enum FlowCommands { @@ -139,7 +140,7 @@ enum FlowCommands { impl FlowActor { pub fn start() -> anyhow::Result<()> { - let (tx, rx) = crossbeam_channel::bounded::>(65536); + let (tx, rx) = crossbeam_channel::bounded::<()>(65536); // Placeholder for when you need to read the flow system. let (cmd_tx, cmd_rx) = crossbeam_channel::bounded::(16); @@ -181,8 +182,10 @@ impl FlowActor { } // A flow event arrives recv(rx) -> msg => { - if let Ok(msg) = msg { - FlowActor::receive_flow(&mut flows, msg.as_slice()); + if let Ok(_) = msg { + while let Some(msg) = FLOW_BYTES.pop() { + FlowActor::receive_flow(&mut flows, msg.as_slice()); + } } } } @@ -257,8 +260,8 @@ pub unsafe extern "C" fn flowbee_handle_events( // Copy the bytes (to free the ringbuffer slot) let data_u8 = data as *const u8; let data_slice: &[u8] = slice::from_raw_parts(data_u8, EVENT_SIZE); - let target: Box<[u8; EVENT_SIZE]> = Box::new(data_slice.try_into().unwrap()); - if tx.try_send(target).is_err() { + FLOW_BYTES.push(data_slice.try_into().unwrap()); + if tx.try_send(()).is_err() { warn!("Could not submit flow event - buffer full"); } } else { From eeea4acf147b4f6ed725d340ccc622b88e8e1461 Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Thu, 24 Oct 2024 11:58:53 -0500 Subject: [PATCH 2/2] Use a static flow arena. --- .../flow_data/flow_analysis/kernel_ringbuffer.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs index 4eba944b..2e52a0ce 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs @@ -9,6 +9,7 @@ use std::{ use tracing::{warn, error}; use zerocopy::FromBytes; use std::sync::OnceLock; +use once_cell::sync::Lazy; static EVENT_COUNT: AtomicU64 = AtomicU64::new(0); static EVENTS_PER_SECOND: AtomicU64 = AtomicU64::new(0); @@ -130,7 +131,7 @@ const EVENT_SIZE: usize = size_of::(); static FLOW_BYTES_SENDER: OnceLock> = OnceLock::new(); static FLOW_COMMAND_SENDER: OnceLock> = OnceLock::new(); -static FLOW_BYTES: crossbeam_queue::SegQueue<[u8; EVENT_SIZE]> = crossbeam_queue::SegQueue::new(); +static FLOW_BYTES: Lazy> = Lazy::new(|| crossbeam_queue::ArrayQueue::new(65536*2)); #[derive(Debug)] enum FlowCommands { @@ -260,9 +261,10 @@ pub unsafe extern "C" fn flowbee_handle_events( // Copy the bytes (to free the ringbuffer slot) let data_u8 = data as *const u8; let data_slice: &[u8] = slice::from_raw_parts(data_u8, EVENT_SIZE); - FLOW_BYTES.push(data_slice.try_into().unwrap()); - if tx.try_send(()).is_err() { - warn!("Could not submit flow event - buffer full"); + if let Ok(_) = FLOW_BYTES.push(data_slice.try_into().unwrap()) { + if tx.try_send(()).is_err() { + warn!("Could not submit flow event - buffer full"); + } } } else { warn!("Flow ringbuffer data arrived before the actor is ready. Dropping it.");