Skip to content

Commit

Permalink
Clean up a bunch of warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
thebracket committed Nov 13, 2024
1 parent 330065b commit 53e3a17
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 71 deletions.
3 changes: 3 additions & 0 deletions src/rust/lqos_sys/src/bpf_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ pub fn end_flows(flows: &mut [FlowbeeKey]) -> anyhow::Result<()> {
Ok(())
}

/// Expire all throughput data for the given keys
/// This uses the bulk delete method, which is faster than
/// the per-row method due to only having one lock.
pub fn expire_throughput(mut keys: Vec<XdpIpAddress>) -> anyhow::Result<()> {
let mut map = BpfMap::<XdpIpAddress, HostCounter>::from_path("/sys/fs/bpf/map_traffic")?;
map.clear_bulk_keys(&mut keys)?;
Expand Down
5 changes: 5 additions & 0 deletions src/rust/lqosd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ mod preflight_checks;
mod node_manager;
mod system_stats;

#[cfg(feature = "flamegraphs")]
use std::io::Write;
use std::net::IpAddr;

#[cfg(feature = "flamegraphs")]
use allocative::Allocative;
use crate::{
file_lock::FileLock,
Expand Down Expand Up @@ -44,7 +47,9 @@ use crate::ip_mapping::clear_hot_cache;
//use mimalloc::MiMalloc;

use tracing::level_filters::LevelFilter;
#[cfg(feature = "flamegraphs")]
use crate::throughput_tracker::flow_data::{ALL_FLOWS, RECENT_FLOWS};
#[cfg(feature = "flamegraphs")]
use crate::throughput_tracker::THROUGHPUT_TRACKER;
// Use JemAllocator only on supported platforms
//#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
Expand Down
6 changes: 3 additions & 3 deletions src/rust/lqosd/src/node_manager/local_api/unknown_ips.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ pub fn get_unknown_ips() -> Vec<UnknownIp> {
.lock()
.unwrap()
.iter()
.filter(|(k,v)| !k.as_ip().is_loopback())
.filter(|(k,d)| d.tc_handle.as_u32() == 0)
.filter(|(k,d)| {
.filter(|(k,_v)| !k.as_ip().is_loopback())
.filter(|(_k,d)| d.tc_handle.as_u32() == 0)
.filter(|(k,_d)| {
let ip = k.as_ip();
!sd_reader.trie.longest_match(ip).is_some()
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub async fn circuit_capacity(channels: Arc<PubSub>) {
let mut circuits: HashMap<String, CircuitAccumulator> = HashMap::new();

// Aggregate the data by circuit id
THROUGHPUT_TRACKER.raw_data.lock().unwrap().iter().for_each(|(k,c)| {
THROUGHPUT_TRACKER.raw_data.lock().unwrap().iter().for_each(|(_k,c)| {
if let Some(circuit_id) = &c.circuit_id {
if let Some(accumulator) = circuits.get_mut(circuit_id) {
accumulator.bytes += c.bytes_per_second;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,11 +445,11 @@ impl FinishedFlowAnalysis {
}

impl FlowbeeRecipient for FinishedFlowAnalysis {
fn enqueue(&self, key: FlowbeeKey, mut data: FlowbeeLocalData, analysis: FlowAnalysis) {
fn enqueue(&self, key: FlowbeeKey, data: FlowbeeLocalData, analysis: FlowAnalysis) {
debug!("Finished flow analysis");
let one_way = data.bytes_sent.down == 0 || data.bytes_sent.up == 0;
if !one_way {
data.trim(); // Remove the trailing 30 seconds of zeroes
//data.trim(); // Remove the trailing 30 seconds of zeroes
//let tp_buf_dn = data.throughput_buffer.iter().map(|v| v.down).collect();
//let tp_buf_up = data.throughput_buffer.iter().map(|v| v.up).collect();
lts2_sys::two_way_flow(
Expand Down
25 changes: 0 additions & 25 deletions src/rust/lqosd/src/throughput_tracker/flow_data/flow_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,28 +74,3 @@ impl From<&FlowbeeData> for FlowbeeLocalData {
}
}
}

impl FlowbeeLocalData {
pub fn trim(&mut self) {
// Find the point at which the throughput buffer starts being all zeroes
let mut last_start: Option<usize> = None;
let mut in_zero_run = false;

/*for (i, &value) in self.throughput_buffer.iter().enumerate() {
if value.down == 0 && value.up == 0 {
if !in_zero_run {
in_zero_run = true;
last_start = Some(i);
}
} else {
in_zero_run = false;
}
}*/

if let Some(start_index) = last_start {
// There's a run of zeroes terminating the throughput buffer
// That means we need to truncate the buffer
//self.throughput_buffer.truncate(start_index);
}
}
}
58 changes: 29 additions & 29 deletions src/rust/lqosd/src/throughput_tracker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,8 @@ fn submit_throughput_stats(
.raw_data
.lock().unwrap()
.iter()
.filter(|(k,h)| h.circuit_id.is_some() && h.bytes_per_second.not_zero())
.for_each(|(k,h)| {
.filter(|(_k,h)| h.circuit_id.is_some() && h.bytes_per_second.not_zero())
.for_each(|(_k,h)| {
if let Some(c) = circuit_throughput.get_mut(&h.circuit_hash.unwrap()) {
c.bytes += h.bytes_per_second;
c.packets += h.packets_per_second;
Expand All @@ -501,8 +501,8 @@ fn submit_throughput_stats(
.lock()
.unwrap()
.iter()
.filter(|(k,h)| h.circuit_id.is_some() && h.tcp_retransmits.not_zero())
.for_each(|(k,h)| {
.filter(|(_k,h)| h.circuit_id.is_some() && h.tcp_retransmits.not_zero())
.for_each(|(_k,h)| {
if let Some(c) = circuit_retransmits.get_mut(&h.circuit_hash.unwrap()) {
*c += h.tcp_retransmits;
} else {
Expand All @@ -515,8 +515,8 @@ fn submit_throughput_stats(
.lock()
.unwrap()
.iter()
.filter(|(k,h)| h.circuit_id.is_some() && h.median_latency().is_some())
.for_each(|(k,h)| {
.filter(|(_k,h)| h.circuit_id.is_some() && h.median_latency().is_some())
.for_each(|(_k,h)| {
if let Some(c) = circuit_rtt.get_mut(&h.circuit_hash.unwrap()) {
c.push(h.median_latency().unwrap());
} else {
Expand Down Expand Up @@ -778,8 +778,8 @@ pub fn top_n(start: u32, end: u32) -> BusResponse {
.lock()
.unwrap()
.iter()
.filter(|(k,v)| !k.as_ip().is_loopback())
.filter(|(k,d)| retire_check(tp_cycle, d.most_recent_cycle))
.filter(|(k,_v)| !k.as_ip().is_loopback())
.filter(|(_k,d)| retire_check(tp_cycle, d.most_recent_cycle))
.map(|(k,te)| {
(
*k,
Expand Down Expand Up @@ -831,9 +831,9 @@ pub fn worst_n(start: u32, end: u32) -> BusResponse {
.lock()
.unwrap()
.iter()
.filter(|(k,v)| !k.as_ip().is_loopback())
.filter(|(k,d)| retire_check(tp_cycle, d.most_recent_cycle))
.filter(|(k,te)| te.median_latency().is_some())
.filter(|(k,_v)| !k.as_ip().is_loopback())
.filter(|(_k,d)| retire_check(tp_cycle, d.most_recent_cycle))
.filter(|(_k,te)| te.median_latency().is_some())
.map(|(k,te)| {
(
*k,
Expand Down Expand Up @@ -885,9 +885,9 @@ pub fn worst_n_retransmits(start: u32, end: u32) -> BusResponse {
.lock()
.unwrap()
.iter()
.filter(|(k,v)| !k.as_ip().is_loopback())
.filter(|(k,d)| retire_check(tp_cycle, d.most_recent_cycle))
.filter(|(k,te)| te.median_latency().is_some())
.filter(|(k,_v)| !k.as_ip().is_loopback())
.filter(|(_k,d)| retire_check(tp_cycle, d.most_recent_cycle))
.filter(|(_k,te)| te.median_latency().is_some())
.map(|(k,te)| {
(
*k,
Expand Down Expand Up @@ -943,9 +943,9 @@ pub fn best_n(start: u32, end: u32) -> BusResponse {
.lock()
.unwrap()
.iter()
.filter(|(k,v)| !k.as_ip().is_loopback())
.filter(|(k,d)| retire_check(tp_cycle, d.most_recent_cycle))
.filter(|(k,te)| te.median_latency().is_some())
.filter(|(k,_v)| !k.as_ip().is_loopback())
.filter(|(_k,d)| retire_check(tp_cycle, d.most_recent_cycle))
.filter(|(_k,te)| te.median_latency().is_some())
.map(|(k, te)| {
(
*k,
Expand Down Expand Up @@ -997,8 +997,8 @@ pub fn xdp_pping_compat() -> BusResponse {
.lock()
.unwrap()
.iter()
.filter(|(k,d)| retire_check(raw_cycle, d.most_recent_cycle))
.filter_map(|(k,data)| {
.filter(|(_k,d)| retire_check(raw_cycle, d.most_recent_cycle))
.filter_map(|(_k,data)| {
if data.tc_handle.as_u32() > 0 {
let mut valid_samples: Vec<u32> = data
.recent_rtt_data
Expand Down Expand Up @@ -1053,8 +1053,8 @@ pub fn min_max_median_rtt() -> Option<MinMaxMedianRtt> {
.lock()
.unwrap()
.iter()
.filter(|(k,d)| retire_check(reader_cycle, d.most_recent_cycle))
.for_each(|(k,d)| {
.filter(|(_k,d)| retire_check(reader_cycle, d.most_recent_cycle))
.for_each(|(_k,d)| {
samples.extend(
d.recent_rtt_data
.iter()
Expand Down Expand Up @@ -1101,8 +1101,8 @@ pub fn min_max_median_tcp_retransmits() -> TcpRetransmitTotal {
.lock()
.unwrap()
.iter()
.filter(|(k,d)| retire_check(reader_cycle, d.most_recent_cycle))
.for_each(|(k,d)| {
.filter(|(_k,d)| retire_check(reader_cycle, d.most_recent_cycle))
.for_each(|(_k,d)| {
total.up += d.tcp_retransmits.up as i32;
total.down += d.tcp_retransmits.down as i32;
});
Expand All @@ -1120,7 +1120,7 @@ pub fn rtt_histogram<const N: usize>() -> BusResponse {
.lock()
.unwrap()
.iter()
.filter(|(k,d)| retire_check(reader_cycle, d.most_recent_cycle))
.filter(|(_k,d)| retire_check(reader_cycle, d.most_recent_cycle))
{
let valid_samples: Vec<f64> = data
.recent_rtt_data
Expand Down Expand Up @@ -1151,8 +1151,8 @@ pub fn host_counts() -> BusResponse {
.lock()
.unwrap()
.iter()
.filter(|(k,d)| retire_check(tp_cycle, d.most_recent_cycle))
.for_each(|(k,d)| {
.filter(|(_k,d)| retire_check(tp_cycle, d.most_recent_cycle))
.for_each(|(_k,d)| {
total += 1;
if d.tc_handle.as_u32() != 0 {
shaped += 1;
Expand Down Expand Up @@ -1181,9 +1181,9 @@ pub fn all_unknown_ips() -> BusResponse {
.lock()
.unwrap()
.iter()
.filter(|(k,v)| !k.as_ip().is_loopback())
.filter(|(k,d)| d.tc_handle.as_u32() == 0)
.filter(|(k,d)| d.last_seen as u128 > five_minutes_ago_nanoseconds)
.filter(|(k,_v)| !k.as_ip().is_loopback())
.filter(|(_k,d)| d.tc_handle.as_u32() == 0)
.filter(|(_k,d)| d.last_seen as u128 > five_minutes_ago_nanoseconds)
.map(|(k,te)| {
(
*k,
Expand Down
21 changes: 10 additions & 11 deletions src/rust/lqosd/src/throughput_tracker/tracking_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::sync::Mutex;
use allocative_derive::Allocative;
use crate::{shaped_devices_tracker::SHAPED_DEVICES, stats::HIGH_WATERMARK, throughput_tracker::flow_data::{expire_rtt_flows, flowbee_rtt_map}};
use super::{flow_data::{get_flowbee_event_count_and_reset, FlowAnalysis, FlowbeeLocalData, RttData, ALL_FLOWS}, throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS};
use dashmap::DashMap;
use fxhash::FxHashMap;
use tracing::{debug, info, warn};
use lqos_bus::TcHandle;
Expand Down Expand Up @@ -48,7 +47,7 @@ impl ThroughputTracker {
// Copy previous byte/packet numbers and reset RTT data
let self_cycle = self.cycle.load(std::sync::atomic::Ordering::Relaxed);
let mut raw_data = self.raw_data.lock().unwrap();
raw_data.iter_mut().for_each(|(k,v)| {
raw_data.iter_mut().for_each(|(_k,v)| {
if v.first_cycle < self_cycle {
v.bytes_per_second = v.bytes.checked_sub_or_zero(v.prev_bytes);
v.packets_per_second = v.packets.checked_sub_or_zero(v.prev_packets);
Expand Down Expand Up @@ -125,7 +124,7 @@ impl ThroughputTracker {
let self_cycle = self.cycle.load(std::sync::atomic::Ordering::Relaxed);
let mut raw_data = self.raw_data.lock().unwrap();
throughput_for_each(&mut |xdp_ip, counts| {
if let Some(mut entry) = raw_data.get_mut(xdp_ip) {
if let Some(entry) = raw_data.get_mut(xdp_ip) {
// Zero the counter, we have to do a per-CPU sum
entry.bytes = DownUpOrder::zeroed();
entry.packets = DownUpOrder::zeroed();
Expand Down Expand Up @@ -221,9 +220,9 @@ impl ThroughputTracker {
ALL_QUEUE_SUMMARY.calculate_total_queue_stats();

// Iterate through the queue data and find the matching circuit_id
let mut raw_data = self.raw_data.lock().unwrap();
let raw_data = self.raw_data.lock().unwrap();
ALL_QUEUE_SUMMARY.iterate_queues(|circuit_hash, drops, marks| {
if let Some((_k,entry)) = raw_data.iter().find(|(k,v)| {
if let Some((_k,entry)) = raw_data.iter().find(|(_k,v)| {
match v.circuit_hash {
Some(ref id) => *id == circuit_hash,
None => false,
Expand Down Expand Up @@ -280,7 +279,7 @@ impl ThroughputTracker {
this_flow.0.retry_times_up.push(data.last_seen);
}

let change_since_last_time = data.bytes_sent.checked_sub_or_zero(this_flow.0.bytes_sent);
//let change_since_last_time = data.bytes_sent.checked_sub_or_zero(this_flow.0.bytes_sent);
//this_flow.0.throughput_buffer.push(change_since_last_time);
//println!("{change_since_last_time:?}");

Expand Down Expand Up @@ -347,7 +346,7 @@ impl ThroughputTracker {
if !rtts.is_empty() {
rtts.sort();
let median = rtts[rtts.len() / 2];
if let Some(mut tracker) = raw_data.get_mut(&local_ip) {
if let Some(tracker) = raw_data.get_mut(&local_ip) {
// Only apply if the flow has achieved 1 Mbps or more
if tracker.bytes_per_second.sum_exceeds(125_000) {
// Shift left
Expand All @@ -373,7 +372,7 @@ impl ThroughputTracker {
}
// Apply the new ones
for (local_ip, retries) in tcp_retries {
if let Some(mut tracker) = raw_data.get_mut(&local_ip) {
if let Some(tracker) = raw_data.get_mut(&local_ip) {
tracker.tcp_retransmits.down = retries.down.saturating_sub(tracker.prev_tcp_retransmits.down);
tracker.tcp_retransmits.up = retries.up.saturating_sub(tracker.prev_tcp_retransmits.up);
tracker.prev_tcp_retransmits.down = retries.down;
Expand Down Expand Up @@ -416,14 +415,14 @@ impl ThroughputTracker {
self.udp_packets_per_second.set_to_zero();
self.icmp_packets_per_second.set_to_zero();
self.shaped_bytes_per_second.set_to_zero();
let mut raw_data = self.raw_data.lock().unwrap();
let raw_data = self.raw_data.lock().unwrap();
raw_data
.iter()
.filter(|(k,v)|
.filter(|(_k,v)|
v.most_recent_cycle == current_cycle &&
v.first_cycle + 2 < current_cycle
)
.map(|(k,v)| {
.map(|(_k,v)| {
(
v.bytes.down.saturating_sub(v.prev_bytes.down),
v.bytes.up.saturating_sub(v.prev_bytes.up),
Expand Down

0 comments on commit 53e3a17

Please sign in to comment.