Skip to content

Commit

Permalink
Adds Allocative tracking (just all hosts for now). Enabled if you use…
Browse files Browse the repository at this point in the history
… the feature flag "flamegraphs" on `lqosd` - it will dump an SVG to /tmp every minute with the current buffer status and details.
  • Loading branch information
thebracket committed Nov 13, 2024
1 parent 7b68ef5 commit 12606e5
Show file tree
Hide file tree
Showing 13 changed files with 156 additions and 53 deletions.
137 changes: 92 additions & 45 deletions src/rust/Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ ip_network_table = "0"
ip_network = "0"
sha2 = "0"
uuid = { version = "1", features = ["v4", "fast-rng" ] }
dashmap = "=5.1.0"
dashmap = "^5.5.3"
toml = "0.8.8"
zerocopy = {version = "0.8.5", features = [ "derive", "zerocopy-derive", "simd" ] }
sysinfo = { version = "0", default-features = false, features = [ "system" ] }
Expand Down
4 changes: 4 additions & 0 deletions src/rust/lqos_bus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ tracing = { workspace = true }
nix = { workspace = true }
serde_cbor = { workspace = true }

# For memory debugging
allocative = { version = "0.3.3", features = [ "dashmap" ] }
allocative_derive = "0.3.3"

[dev-dependencies]
criterion = { version = "0", features = [ "html_reports", "async_tokio"] }

Expand Down
3 changes: 2 additions & 1 deletion src/rust/lqos_bus/src/tc_handle.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use allocative_derive::Allocative;
use tracing::error;
use lqos_utils::hex_string::read_hex_string;
use serde::{Deserialize, Serialize};
use thiserror::Error;

/// Provides consistent handling of TC handle types.
#[derive(
Copy, Clone, Serialize, Deserialize, Debug, Default, PartialEq, Eq, Hash
Copy, Clone, Serialize, Deserialize, Debug, Default, PartialEq, Eq, Hash, Allocative
)]
pub struct TcHandle(u32);

Expand Down
4 changes: 4 additions & 0 deletions src/rust/lqos_utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ byteorder = { workspace = true }
zerocopy = { workspace = true }
num-traits = { workspace = true }
crossbeam-channel = { workspace = true }

# For memory debugging
allocative = { version = "0.3.3", features = [ "dashmap" ] }
allocative_derive = "0.3.3"
3 changes: 2 additions & 1 deletion src/rust/lqos_utils/src/units/atomic_down_up.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering::Relaxed;
use allocative_derive::Allocative;
use crate::units::DownUpOrder;

/// AtomicDownUp is a struct that contains two atomic u64 values, one for down and one for up.
Expand All @@ -12,7 +13,7 @@ use crate::units::DownUpOrder;
///
/// Note that unlike the DownUpOrder struct, it is not intended for direct serialization, and
/// is not generic.
#[derive(Debug)]
#[derive(Debug, Allocative)]
pub struct AtomicDownUp {
down: AtomicU64,
up: AtomicU64,
Expand Down
3 changes: 2 additions & 1 deletion src/rust/lqos_utils/src/units/down_up.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! helps reduce directional confusion/bugs.

use std::ops::AddAssign;
use allocative_derive::Allocative;
use serde::{Deserialize, Serialize};
use zerocopy::FromBytes;
use crate::units::UpDownOrder;
Expand All @@ -11,7 +12,7 @@ use crate::units::UpDownOrder;
/// stored statistics to eliminate confusion. This is a generic
/// type: you can control the type stored inside.
#[repr(C)]
#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, FromBytes, Default, Ord, PartialOrd)]
#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, FromBytes, Default, Ord, PartialOrd, Allocative)]
pub struct DownUpOrder<T> {
/// The down value
pub down: T,
Expand Down
3 changes: 2 additions & 1 deletion src/rust/lqos_utils/src/xdp_ip_address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ use std::fmt::Display;
use byteorder::{BigEndian, ByteOrder};
use zerocopy::FromBytes;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use allocative_derive::Allocative;

/// XdpIpAddress provides helpful conversion between the XDP program's
/// native storage of IP addresses in `[u8; 16]` blocks of bytes and
/// Rust `IpAddr` types.
#[repr(C)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, FromBytes)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, FromBytes, Allocative)]
pub struct XdpIpAddress(pub [u8; 16]);

impl Default for XdpIpAddress {
Expand Down
4 changes: 4 additions & 0 deletions src/rust/lqosd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ license = "GPL-2.0-only"
[features]
default = ["equinix_tests"]
equinix_tests = []
flamegraphs = []

[dependencies]
anyhow = { workspace = true }
Expand Down Expand Up @@ -53,6 +54,9 @@ crossbeam-channel = { workspace = true }
arc-swap = { workspace = true }
crossbeam-queue = { workspace = true }

# For memory debugging
allocative = { version = "0.3.3", features = [ "dashmap" ] }
allocative_derive = "0.3.3"

# Support JemAlloc on supported platforms
#[target.'cfg(any(target_arch = "x86", target_arch = "x86_64"))'.dependencies]
Expand Down
27 changes: 26 additions & 1 deletion src/rust/lqosd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ mod preflight_checks;
mod node_manager;
mod system_stats;

use std::io::Write;
use std::net::IpAddr;
use allocative::Allocative;
use crate::{
file_lock::FileLock,
ip_mapping::{clear_ip_flows, del_ip_flow, list_mapped_ips, map_ip_to_flow}, throughput_tracker::flow_data::{flowbee_handle_events, setup_netflow_tracker, FlowActor},
Expand Down Expand Up @@ -42,7 +44,7 @@ use crate::ip_mapping::clear_hot_cache;
//use mimalloc::MiMalloc;

use tracing::level_filters::LevelFilter;

use crate::throughput_tracker::THROUGHPUT_TRACKER;
// Use JemAllocator only on supported platforms
//#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
//#[global_allocator]
Expand Down Expand Up @@ -184,6 +186,9 @@ fn main() -> Result<()> {
// Create the socket server
let server = UnixSocketServer::new().expect("Unable to spawn server");

// Memory Debugging
memory_debug();

let handle = std::thread::Builder::new().name("Async Bus/Web".to_string()).spawn(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
Expand Down Expand Up @@ -212,6 +217,26 @@ fn main() -> Result<()> {
Ok(())
}

#[cfg(feature = "flamegraphs")]
fn memory_debug() {
std::thread::spawn(|| {
loop {
std::thread::sleep(std::time::Duration::from_secs(60));
let mut fb = allocative::FlameGraphBuilder::default();
fb.visit_global_roots();
fb.visit_root(&*THROUGHPUT_TRACKER);
let flamegraph_src = fb.finish();
let flamegraph_src = flamegraph_src.flamegraph();
let mut file = std::fs::File::create("/tmp/lqosd-mem.svg").unwrap();
file.write_all(flamegraph_src.write().as_bytes()).unwrap();
info!("Wrote flamegraph to /tmp/lqosd-mem.svg");
}
});
}

#[cfg(not(feature = "flamegraphs"))]
fn memory_debug() {}

fn handle_bus_requests(
requests: &[BusRequest],
responses: &mut Vec<BusResponse>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
//! multipliers, divisors, etc. It is intended to become pervasive
//! throughout the system.

use allocative_derive::Allocative;
use serde::Serialize;

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Allocative)]
pub struct RttData {
nanoseconds: u64,
}
Expand Down
3 changes: 2 additions & 1 deletion src/rust/lqosd/src/throughput_tracker/throughput_entry.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use allocative_derive::Allocative;
use lqos_bus::TcHandle;
use lqos_utils::units::DownUpOrder;
use super::flow_data::RttData;

#[derive(Debug)]
#[derive(Debug, Allocative)]
pub(crate) struct ThroughputEntry {
pub(crate) circuit_id: Option<String>,
pub(crate) circuit_hash: Option<i64>,
Expand Down
13 changes: 13 additions & 0 deletions src/rust/lqosd/src/throughput_tracker/tracking_data.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{sync::atomic::AtomicU64, time::Duration};
use allocative_derive::Allocative;
use crate::{shaped_devices_tracker::SHAPED_DEVICES, stats::HIGH_WATERMARK, throughput_tracker::flow_data::{expire_rtt_flows, flowbee_rtt_map}};
use super::{flow_data::{get_flowbee_event_count_and_reset, FlowAnalysis, FlowbeeLocalData, RttData, ALL_FLOWS}, throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS};
use dashmap::DashMap;
Expand All @@ -11,6 +12,7 @@ use lqos_sys::{flowbee_data::FlowbeeKey, iterate_flows, throughput_for_each};
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
use lqos_utils::units::{AtomicDownUp, DownUpOrder};

#[derive(Allocative)]
pub struct ThroughputTracker {
pub(crate) cycle: AtomicU64,
pub(crate) raw_data: DashMap<XdpIpAddress, ThroughputEntry>,
Expand Down Expand Up @@ -459,6 +461,17 @@ impl ThroughputTracker {

pub(crate) fn next_cycle(&self) {
self.cycle.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

// Cleanup
if let Ok(now) = time_since_boot() {
let since_boot = Duration::from(now);
let timeout_seconds = 5 * 60; // 5 minutes
let expire = (since_boot - Duration::from_secs(timeout_seconds)).as_nanos() as u64;
self.raw_data.retain(|k, v| {
v.last_seen >= expire
});
self.raw_data.shrink_to_fit();
}
}

pub(crate) fn bits_per_second(&self) -> DownUpOrder<u64> {
Expand Down

0 comments on commit 12606e5

Please sign in to comment.