Skip to content

Commit

Permalink
Merge branch 'ui_stability' into lts2_client
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/rust/Cargo.lock
  • Loading branch information
thebracket committed Oct 24, 2024
2 parents 572a3b5 + eeea4ac commit f7d3f5c
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 54 deletions.
132 changes: 86 additions & 46 deletions src/rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ num-traits = "0.2.19"
clap = { version = "4", features = ["derive"] }
timerfd = "1.6"
crossbeam-channel = { version = "0.5" }
crossbeam-queue = "0.3.11"
arc-swap = "1.7.1"

# May have to change this one for ARM?
jemallocator = "0.5"
3 changes: 2 additions & 1 deletion src/rust/lqosd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ time = { version = "0.3.36", features = ["serde"] }
serde_cbor = { workspace = true }
timerfd = { workspace = true }
crossbeam-channel = { workspace = true }
arc-swap = "1.7.1"
arc-swap = { workspace = true }
crossbeam-queue = { workspace = true }


# Support JemAlloc on supported platforms
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
use tracing::{warn, error};
use zerocopy::FromBytes;
use std::sync::OnceLock;
use once_cell::sync::Lazy;

static EVENT_COUNT: AtomicU64 = AtomicU64::new(0);
static EVENTS_PER_SECOND: AtomicU64 = AtomicU64::new(0);
Expand Down Expand Up @@ -128,8 +129,9 @@ pub struct FlowActor {}

const EVENT_SIZE: usize = size_of::<FlowbeeEvent>();

static FLOW_BYTES_SENDER: OnceLock<crossbeam_channel::Sender<Box<[u8; EVENT_SIZE]>>> = OnceLock::new();
static FLOW_BYTES_SENDER: OnceLock<crossbeam_channel::Sender<()>> = OnceLock::new();
static FLOW_COMMAND_SENDER: OnceLock<crossbeam_channel::Sender<FlowCommands>> = OnceLock::new();
static FLOW_BYTES: Lazy<crossbeam_queue::ArrayQueue<[u8; EVENT_SIZE]>> = Lazy::new(|| crossbeam_queue::ArrayQueue::new(65536*2));

#[derive(Debug)]
enum FlowCommands {
Expand All @@ -139,7 +141,7 @@ enum FlowCommands {

impl FlowActor {
pub fn start() -> anyhow::Result<()> {
let (tx, rx) = crossbeam_channel::bounded::<Box<[u8; EVENT_SIZE]>>(65536);
let (tx, rx) = crossbeam_channel::bounded::<()>(65536);
// Placeholder for when you need to read the flow system.
let (cmd_tx, cmd_rx) = crossbeam_channel::bounded::<FlowCommands>(16);

Expand Down Expand Up @@ -181,8 +183,10 @@ impl FlowActor {
}
// A flow event arrives
recv(rx) -> msg => {
if let Ok(msg) = msg {
FlowActor::receive_flow(&mut flows, msg.as_slice());
if let Ok(_) = msg {
while let Some(msg) = FLOW_BYTES.pop() {
FlowActor::receive_flow(&mut flows, msg.as_slice());
}
}
}
}
Expand Down Expand Up @@ -257,9 +261,10 @@ pub unsafe extern "C" fn flowbee_handle_events(
// Copy the bytes (to free the ringbuffer slot)
let data_u8 = data as *const u8;
let data_slice: &[u8] = slice::from_raw_parts(data_u8, EVENT_SIZE);
let target: Box<[u8; EVENT_SIZE]> = Box::new(data_slice.try_into().unwrap());
if tx.try_send(target).is_err() {
warn!("Could not submit flow event - buffer full");
if let Ok(_) = FLOW_BYTES.push(data_slice.try_into().unwrap()) {
if tx.try_send(()).is_err() {
warn!("Could not submit flow event - buffer full");
}
}
} else {
warn!("Flow ringbuffer data arrived before the actor is ready. Dropping it.");
Expand Down

0 comments on commit f7d3f5c

Please sign in to comment.