diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index 5cd0baea..07d0776e 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -718,6 +718,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -834,13 +843,15 @@ dependencies = [ [[package]] name = "dashmap" -version = "5.1.0" +version = "5.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0834a35a3fce649144119e18da2a4d8ed12ef3862f47183fd46f625d072d96c" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "num_cpus", - "parking_lot", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", ] [[package]] @@ -873,7 +884,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", - "serde", ] [[package]] @@ -1036,15 +1046,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "fsevent-sys" -version = "4.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" -dependencies = [ - "libc", -] - [[package]] name = "futures" version = "0.3.30" @@ -1847,7 +1848,6 @@ name = "lqos_utils" version = "0.1.0" dependencies = [ "byteorder", - "crossbeam-channel", "nix", "notify", "num-traits", @@ -1867,6 +1867,7 @@ dependencies = [ "axum-extra", "bincode", "crossbeam-channel", + "crossbeam-queue", "csv", "dashmap", "default-net", @@ -1876,7 +1877,6 @@ dependencies = [ "ip_network_table", "itertools 0.12.1", "jemallocator", - "libloading", "lqos_bus", "lqos_config", "lqos_heimdall", @@ -1884,7 +1884,6 @@ dependencies = [ "lqos_support_tool", "lqos_sys", "lqos_utils", - "lts2_sys", "lts_client", "mime_guess", "nix", @@ -1892,13 +1891,11 @@ dependencies = [ "rand", "reqwest", "serde", - "serde_cbor", "serde_json", "signal-hook", "strum", "surge-ping", "sysinfo", - "time", "timerfd", "tokio", "tower-http", @@ -1949,16 +1946,6 @@ dependencies = [ "hashbrown", ] -[[package]] -name = "lts2_sys" -version = "0.1.0" -dependencies = [ - "anyhow", - "log", - "lqos_config", - "serde", -] - [[package]] name = "lts_client" version = "0.1.0" @@ -2167,21 +2154,18 @@ dependencies = [ [[package]] name = "notify" -version = "6.1.1" +version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" +checksum = "729f63e1ca555a43fe3efa4f3efdf4801c479da85b432242a7b726f353c88486" dependencies = [ - "bitflags 2.6.0", - "crossbeam-channel", + "bitflags 1.3.2", "filetime", - "fsevent-sys", "inotify", "kqueue", "libc", - "log", "mio 0.8.11", "walkdir", - "windows-sys 0.48.0", + "windows-sys 0.45.0", ] [[package]] @@ -2218,16 +2202,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "num_cpus" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" -dependencies = [ - "hermit-abi 0.3.9", - "libc", -] - [[package]] name = "object" version = "0.36.4" @@ -4009,6 +3983,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets 0.42.2", +] + [[package]] name = "windows-sys" version = "0.48.0" @@ -4036,6 +4019,21 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-targets" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + [[package]] name = "windows-targets" version = "0.48.5" @@ -4067,6 +4065,12 @@ dependencies = [ "windows_x86_64_msvc 0.52.6", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -4079,6 +4083,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -4091,6 +4101,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_i686_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" + [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -4109,6 +4125,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" + [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -4121,6 +4143,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -4133,6 +4161,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -4145,6 +4179,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" diff --git a/src/rust/Cargo.toml b/src/rust/Cargo.toml index 91676a3c..bda9bd74 100644 --- a/src/rust/Cargo.toml +++ b/src/rust/Cargo.toml @@ -72,6 +72,8 @@ num-traits = "0.2.19" clap = { version = "4", features = ["derive"] } timerfd = "1.6" crossbeam-channel = { version = "0.5" } +crossbeam-queue = "0.3.11" +arc-swap = "1.7.1" # May have to change this one for ARM? jemallocator = "0.5" diff --git a/src/rust/lqosd/Cargo.toml b/src/rust/lqosd/Cargo.toml index 1d8abfda..919fe323 100644 --- a/src/rust/lqosd/Cargo.toml +++ b/src/rust/lqosd/Cargo.toml @@ -51,7 +51,8 @@ time = { version = "0.3.36", features = ["serde"] } serde_cbor = { workspace = true } timerfd = { workspace = true } crossbeam-channel = { workspace = true } -arc-swap = "1.7.1" +arc-swap = { workspace = true } +crossbeam-queue = { workspace = true } # Support JemAlloc on supported platforms diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs index c1d62a95..2e52a0ce 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs @@ -9,6 +9,7 @@ use std::{ use tracing::{warn, error}; use zerocopy::FromBytes; use std::sync::OnceLock; +use once_cell::sync::Lazy; static EVENT_COUNT: AtomicU64 = AtomicU64::new(0); static EVENTS_PER_SECOND: AtomicU64 = AtomicU64::new(0); @@ -128,8 +129,9 @@ pub struct FlowActor {} const EVENT_SIZE: usize = size_of::(); -static FLOW_BYTES_SENDER: OnceLock>> = OnceLock::new(); +static FLOW_BYTES_SENDER: OnceLock> = OnceLock::new(); static FLOW_COMMAND_SENDER: OnceLock> = OnceLock::new(); +static FLOW_BYTES: Lazy> = Lazy::new(|| crossbeam_queue::ArrayQueue::new(65536*2)); #[derive(Debug)] enum FlowCommands { @@ -139,7 +141,7 @@ enum FlowCommands { impl FlowActor { pub fn start() -> anyhow::Result<()> { - let (tx, rx) = crossbeam_channel::bounded::>(65536); + let (tx, rx) = crossbeam_channel::bounded::<()>(65536); // Placeholder for when you need to read the flow system. let (cmd_tx, cmd_rx) = crossbeam_channel::bounded::(16); @@ -181,8 +183,10 @@ impl FlowActor { } // A flow event arrives recv(rx) -> msg => { - if let Ok(msg) = msg { - FlowActor::receive_flow(&mut flows, msg.as_slice()); + if let Ok(_) = msg { + while let Some(msg) = FLOW_BYTES.pop() { + FlowActor::receive_flow(&mut flows, msg.as_slice()); + } } } } @@ -257,9 +261,10 @@ pub unsafe extern "C" fn flowbee_handle_events( // Copy the bytes (to free the ringbuffer slot) let data_u8 = data as *const u8; let data_slice: &[u8] = slice::from_raw_parts(data_u8, EVENT_SIZE); - let target: Box<[u8; EVENT_SIZE]> = Box::new(data_slice.try_into().unwrap()); - if tx.try_send(target).is_err() { - warn!("Could not submit flow event - buffer full"); + if let Ok(_) = FLOW_BYTES.push(data_slice.try_into().unwrap()) { + if tx.try_send(()).is_err() { + warn!("Could not submit flow event - buffer full"); + } } } else { warn!("Flow ringbuffer data arrived before the actor is ready. Dropping it.");