diff --git a/Cargo.lock b/Cargo.lock index f1a0fbf5..67c4d3bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -280,6 +280,12 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56899898ce76aaf4a0f24d914c97ea6ed976d42fec6ad33fcbb0a1103e07b2b0" +[[package]] +name = "dyn-clone" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21e50f3adc76d6a43f5ed73b698a87d0760ca74617f60f7c3b879003536fdd28" + [[package]] name = "encoding_rs" version = "0.8.28" @@ -1147,6 +1153,7 @@ dependencies = [ "clap", "colored", "docker-sync", + "dyn-clone", "hostname", "hyper", "k8s-sync", diff --git a/Cargo.toml b/Cargo.toml index 1cc7f3fc..ed777348 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ k8s-sync = { version = "0.2.3", optional = true } #k8s-sync = { path = "../rs-k8s-sync", optional = true } hyper = { version = "0.14", features = ["full"], optional = true } tokio = { version = "1", features = ["full"], optional = true} +dyn-clone = "1.0.5" [features] default = ["prometheus", "riemann", "warp10", "containers", "json"] diff --git a/src/exporters/json.rs b/src/exporters/json.rs index c687bfa3..71226462 100644 --- a/src/exporters/json.rs +++ b/src/exporters/json.rs @@ -225,7 +225,7 @@ impl JSONExporter { .filter_map(|socket| { if let Some(metric) = metrics_iter.find(|x| { if x.name == "scaph_socket_power_microwatts" { - socket.id + socket.get_id() == x.attributes .get("socket_id") .unwrap() @@ -247,7 +247,7 @@ impl JSONExporter { .unwrap() .parse::() .unwrap() - == socket.id + == socket.get_id() }) .map(|d| Domain { name: d.name.clone(), @@ -257,7 +257,7 @@ impl JSONExporter { .collect::>(); Some(Socket { - id: socket.id, + id: socket.get_id(), consumption: (socket_power as f32), domains, timestamp: metric.timestamp.as_secs_f64(), diff --git a/src/exporters/mod.rs b/src/exporters/mod.rs index 6cc26d9e..a5e56934 100644 --- a/src/exporters/mod.rs +++ b/src/exporters/mod.rs @@ -319,7 +319,7 @@ impl MetricGenerator { for socket in &self.topology.sockets { let mut attributes = HashMap::new(); - attributes.insert("socket_id".to_string(), socket.id.to_string()); + attributes.insert("socket_id".to_string(), socket.get_id().to_string()); self.data.push(Metric { name: String::from("scaph_self_socket_stats_nb"), @@ -331,7 +331,7 @@ impl MetricGenerator { tags: vec!["scaphandre".to_string()], attributes: attributes.clone(), description: String::from("Number of CPUStat traces stored for each socket"), - metric_value: MetricValueType::IntUnsigned(socket.stat_buffer.len() as u64), + metric_value: MetricValueType::IntUnsigned(socket.get_stat_buffer_passive().len() as u64), }); self.data.push(Metric { @@ -346,10 +346,10 @@ impl MetricGenerator { description: String::from( "Number of energy consumption Records stored for each socket", ), - metric_value: MetricValueType::IntUnsigned(socket.record_buffer.len() as u64), + metric_value: MetricValueType::IntUnsigned(socket.get_record_buffer_passive().len() as u64), }); - for domain in &socket.domains { + for domain in socket.get_domains_passive() { attributes.insert("rapl_domain_name".to_string(), domain.name.to_string()); self.data.push(Metric { @@ -422,7 +422,7 @@ impl MetricGenerator { let metric_timestamp = metric.timestamp; let mut attributes = HashMap::new(); - attributes.insert("socket_id".to_string(), socket.id.to_string()); + attributes.insert("socket_id".to_string(), socket.get_id().to_string()); self.data.push(Metric { name: String::from("scaph_socket_energy_microjoules"), @@ -466,7 +466,7 @@ impl MetricGenerator { let mut attributes = HashMap::new(); attributes.insert("domain_name".to_string(), domain.name.clone()); attributes.insert("domain_id".to_string(), domain.id.to_string()); - attributes.insert("socket_id".to_string(), socket.id.to_string()); + attributes.insert("socket_id".to_string(), socket.get_id().to_string()); self.data.push(Metric { name: String::from("scaph_domain_energy_microjoules"), diff --git a/src/exporters/warpten.rs b/src/exporters/warpten.rs index 70343ce0..d82b025f 100644 --- a/src/exporters/warpten.rs +++ b/src/exporters/warpten.rs @@ -235,8 +235,8 @@ impl Warp10Exporter { for socket in &self.topology.sockets { let mut metric_labels = labels.clone(); - metric_labels.push(warp10::Label::new("socket_id", &socket.id.to_string())); - let metric_value = socket.stat_buffer.len(); + metric_labels.push(warp10::Label::new("socket_id", &socket.get_id().to_string())); + let metric_value = socket.get_stat_buffer_passive().len(); data.push(warp10::Data::new( time::OffsetDateTime::now_utc(), None, @@ -244,7 +244,7 @@ impl Warp10Exporter { metric_labels.clone(), warp10::Value::Int(metric_value as i32), )); - let metric_value = socket.record_buffer.len(); + let metric_value = socket.get_record_buffer_passive().len(); data.push(warp10::Data::new( time::OffsetDateTime::now_utc(), None, @@ -277,7 +277,7 @@ impl Warp10Exporter { } } - for domain in &socket.domains { + for domain in socket.get_domains_passive() { let mut metric_labels = labels.clone(); metric_labels.push(warp10::Label::new("rapl_domain_name", &domain.name)); let metric_value = domain.record_buffer.len(); diff --git a/src/lib.rs b/src/lib.rs index da878f71..b55f5087 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,7 +13,7 @@ use exporters::{ json::JSONExporter, prometheus::PrometheusExporter, qemu::QemuExporter, riemann::RiemannExporter, stdout::StdoutExporter, warpten::Warp10Exporter, Exporter, }; -use sensors::{powercap_rapl::PowercapRAPLSensor, Sensor}; +use sensors::{powercap_rapl::PowercapRAPLSensor, Sensor, debug::DebugSensor}; use std::collections::HashMap; use std::time::{Duration, SystemTime}; @@ -27,8 +27,8 @@ fn get_argument(matches: &ArgMatches, arg: &'static str) -> String { /// Helper function to get a Sensor instance from ArgMatches fn get_sensor(matches: &ArgMatches) -> Box { - let sensor = match &get_argument(matches, "sensor")[..] { - "powercap_rapl" => PowercapRAPLSensor::new( + match &get_argument(matches, "sensor")[..] { + "powercap_rapl" => Box::new(PowercapRAPLSensor::new( get_argument(matches, "sensor-buffer-per-socket-max-kB") .parse() .unwrap(), @@ -36,8 +36,13 @@ fn get_sensor(matches: &ArgMatches) -> Box { .parse() .unwrap(), matches.is_present("vm"), - ), - _ => PowercapRAPLSensor::new( + )), + "debug" => Box::new(DebugSensor::new( + get_argument(matches, "sensor-buffer-per-socket-max-kB") + .parse() + .unwrap(), + )), + _ => Box::new(PowercapRAPLSensor::new( get_argument(matches, "sensor-buffer-per-socket-max-kB") .parse() .unwrap(), @@ -45,9 +50,8 @@ fn get_sensor(matches: &ArgMatches) -> Box { .parse() .unwrap(), matches.is_present("vm"), - ), - }; - Box::new(sensor) + )) + } } /// Matches the sensor and exporter name and options requested from the command line and diff --git a/src/main.rs b/src/main.rs index ed14ebd2..e6f60fcc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,7 @@ use clap::{crate_authors, crate_version, App, AppSettings, Arg, SubCommand}; use scaphandre::{get_exporters_options, run}; fn main() { - let sensors = ["powercap_rapl"]; + let sensors = ["powercap_rapl", "debug"]; let exporters_options = get_exporters_options(); let exporters = exporters_options.keys(); let exporters: Vec<&str> = exporters.into_iter().map(|x| x.as_str()).collect(); diff --git a/src/sensors/debug.rs b/src/sensors/debug.rs new file mode 100644 index 00000000..d7f289ef --- /dev/null +++ b/src/sensors/debug.rs @@ -0,0 +1,127 @@ +use crate::sensors::{Sensor,Topology,Domain,Socket,Record,CPUStat,CPUCore}; +use std::error::Error; +use super::{utils::{current_system_time_since_epoch}, units}; + +pub struct DebugSensor { + buffer_per_socket_max_kbytes: u16, +} + +impl DebugSensor { + pub fn new(buffer_per_socket_max_kbytes: u16,) -> DebugSensor { + DebugSensor { + buffer_per_socket_max_kbytes + } + } +} + +impl Sensor for DebugSensor { + fn generate_topology(&self) -> Result> { + let mut topo = Topology::new(); + topo.safe_add_socket(DebugSocket::new(1234, self.buffer_per_socket_max_kbytes)); + topo.safe_add_domain_to_socket( + 1234, + 4321, + "debug domain", + "debug domain uj_counter", + self.buffer_per_socket_max_kbytes + ); + Ok(topo) + } + + fn get_topology(&mut self) -> Box> { + Box::new(self.generate_topology().ok()) + } +} + +#[derive(Debug, Clone)] +pub struct DebugSocket { + /// Numerical ID of the CPU socket (physical_id in /proc/cpuinfo) + pub id: u16, + /// RAPL domains attached to the socket + pub domains: Vec, + /// Comsumption records measured and stored by scaphandre for this socket. + pub record_buffer: Vec, + /// Maximum size of the record_buffer in kilobytes. + pub buffer_max_kbytes: u16, + /// CPU cores (core_id in /proc/cpuinfo) attached to the socket. + pub cpu_cores: Vec, + /// Usage statistics records stored for this socket. + pub stat_buffer: Vec, +} + +impl DebugSocket { + pub fn new(id: u16, buffer_max_kbytes: u16) -> DebugSocket { + DebugSocket { + id, + domains: vec![], + record_buffer: vec![], + buffer_max_kbytes, + cpu_cores: vec![], + stat_buffer: vec![] + } + } +} + +impl Socket for DebugSocket { + fn read_record_uj(&self) -> Result> { + Ok(Record::new( + current_system_time_since_epoch(), + String::from("7081760374"), + units::Unit::MicroJoule, + )) + } + + /// Combines stats from all CPU cores owned byu the socket and returns + /// a CpuTime struct containing stats for the whole socket. + fn read_stats(&self) -> Option { + None + } + + fn get_id(&self) -> u16 { + self.id + } + + fn get_record_buffer(&mut self) -> &mut Vec { + &mut self.record_buffer + } + + fn get_record_buffer_passive(&self) -> &Vec { + &self.record_buffer + } + + fn get_buffer_max_kbytes(&self) -> u16 { + self.buffer_max_kbytes + } + + /// Returns a mutable reference to the domains vector. + fn get_domains(&mut self) -> &mut Vec { + &mut self.domains + } + + /// Returns a immutable reference to the domains vector. + fn get_domains_passive(&self) -> &Vec { + &self.domains + } + + /// Returns a mutable reference to the CPU cores vector. + fn get_cores(&mut self) -> &mut Vec { + &mut self.cpu_cores + } + + /// Returns a immutable reference to the CPU cores vector. + fn get_cores_passive(&self) -> &Vec { + &self.cpu_cores + } + + fn get_stat_buffer(&mut self) -> &mut Vec { + &mut self.stat_buffer + } + + fn get_stat_buffer_passive(&self) -> &Vec { + &self.stat_buffer + } + + fn get_debug_type(&self) -> String { + String::from("Debug") + } +} \ No newline at end of file diff --git a/src/sensors/mod.rs b/src/sensors/mod.rs index b3d8a0f6..c2601258 100644 --- a/src/sensors/mod.rs +++ b/src/sensors/mod.rs @@ -4,8 +4,11 @@ //! needed to implement a sensor. pub mod powercap_rapl; +pub mod debug; +pub mod socket; pub mod units; pub mod utils; +use core::fmt::Debug; use procfs::{process, CpuInfo, CpuTime, KernelStats}; use std::collections::HashMap; use std::error::Error; @@ -13,6 +16,7 @@ use std::mem::size_of_val; use std::time::Duration; use std::{fmt, fs}; use utils::{current_system_time_since_epoch, ProcessTracker}; +use socket::Socket; // !!!!!!!!!!!!!!!!! Sensor !!!!!!!!!!!!!!!!!!!!!!! /// Sensor trait, the Sensor API. @@ -29,6 +33,12 @@ pub trait RecordGenerator { fn clean_old_records(&mut self); } +pub trait StatsGenerator { + fn clean_old_stats(&mut self); + fn refresh_stats(&mut self); + fn get_stats_diff(&mut self) -> Option; +} + // !!!!!!!!!!!!!!!!! Topology !!!!!!!!!!!!!!!!!!!!!!! /// Topology struct represents the whole CPUSocket architecture, /// from the electricity consumption point of view, @@ -37,7 +47,7 @@ pub trait RecordGenerator { #[derive(Debug, Clone)] pub struct Topology { /// The CPU sockets found on the host, represented as CPUSocket instances attached to this topology - pub sockets: Vec, + pub sockets: Vec>, /// ProcessTrack instance that keeps track of processes running on the host and CPU stats associated pub proc_tracker: ProcessTracker, /// CPU usage stats buffer @@ -174,21 +184,10 @@ impl Topology { /// socket id doesn't exist already. pub fn safe_add_socket( &mut self, - socket_id: u16, - domains: Vec, - attributes: Vec>>, - counter_uj_path: String, - buffer_max_kbytes: u16, + socket: impl Socket + 'static ) { - if !self.sockets.iter().any(|s| s.id == socket_id) { - let socket = CPUSocket::new( - socket_id, - domains, - attributes, - counter_uj_path, - buffer_max_kbytes, - ); - self.sockets.push(socket); + if !self.sockets.iter().any(|s| s.get_id() == socket.get_id()) { + self.sockets.push(Box::new(socket)); } } @@ -198,12 +197,12 @@ impl Topology { } /// Returns a mutable reference to self.sockets - pub fn get_sockets(&mut self) -> &mut Vec { + pub fn get_sockets(&mut self) -> &mut Vec> { &mut self.sockets } /// Returns an immutable reference to self.sockets - pub fn get_sockets_passive(&self) -> &Vec { + pub fn get_sockets_passive(&self) -> &Vec> { &self.sockets } @@ -232,7 +231,7 @@ impl Topology { ) { let iterator = self.sockets.iter_mut(); for socket in iterator { - if socket.id == socket_id { + if socket.get_id() == socket_id { socket.safe_add_domain(Domain::new( domain_id, String::from(name), @@ -259,9 +258,9 @@ impl Topology { let socket = self .sockets .iter_mut() - .find(|x| &x.id == socket_id) + .find(|x| &x.get_id() == socket_id) .expect("Trick: if you are running on a vm, do not forget to use --vm parameter invoking scaphandre at the command line"); - if socket_id == &socket.id { + if socket_id == &socket.get_id() { socket.add_cpu_core(c); } } @@ -568,103 +567,8 @@ pub struct CPUSocket { pub stat_buffer: Vec, } -impl RecordGenerator for CPUSocket { - /// Generates a new record of the socket energy consumption and stores it in the record_buffer. - /// Returns a clone of this Record instance. - fn refresh_record(&mut self) { - if let Ok(record) = self.read_record_uj() { - self.record_buffer.push(record); - } - - if !self.record_buffer.is_empty() { - self.clean_old_records(); - } - } - - /// Checks the size in memory of record_buffer and deletes as many Record - /// instances from the buffer to make it smaller in memory than buffer_max_kbytes. - fn clean_old_records(&mut self) { - let record_ptr = &self.record_buffer[0]; - let curr_size = size_of_val(record_ptr) * self.record_buffer.len(); - trace!( - "socket rebord buffer current size: {} max_bytes: {}", - curr_size, - self.buffer_max_kbytes * 1000 - ); - if curr_size > (self.buffer_max_kbytes * 1000) as usize { - let size_diff = curr_size - (self.buffer_max_kbytes * 1000) as usize; - trace!( - "socket record size_diff: {} sizeof: {}", - size_diff, - size_of_val(record_ptr) - ); - if size_diff > size_of_val(record_ptr) { - let nb_records_to_delete = size_diff as f32 / size_of_val(record_ptr) as f32; - for _ in 1..nb_records_to_delete as u32 { - if !self.record_buffer.is_empty() { - let res = self.record_buffer.remove(0); - debug!( - "Cleaning socket id {} records buffer, removing: {}", - self.id, res - ); - } - } - } - } - } - - /// Returns a new owned Vector being a clone of the current record_buffer. - /// This does not affect the current buffer but is costly. - fn get_records_passive(&self) -> Vec { - let mut result = vec![]; - for r in &self.record_buffer { - result.push(Record::new( - r.timestamp, - r.value.clone(), - units::Unit::MicroJoule, - )); - } - result - } -} - -impl CPUSocket { - /// Creates and returns a CPUSocket instance with an empty buffer and no CPUCore owned yet. - fn new( - id: u16, - domains: Vec, - attributes: Vec>>, - counter_uj_path: String, - buffer_max_kbytes: u16, - ) -> CPUSocket { - CPUSocket { - id, - domains, - attributes, - counter_uj_path, - record_buffer: vec![], // buffer has to be empty first - buffer_max_kbytes, - cpu_cores: vec![], // cores are instantiated on a later step - stat_buffer: vec![], - } - } - - /// Adds a new Domain instance to the domains vector if and only if it doesn't exist in the vector already. - fn safe_add_domain(&mut self, domain: Domain) { - if !self.domains.iter().any(|d| d.id == domain.id) { - self.domains.push(domain); - } - } - - /// Returns the content of the energy consumption counter file, as a String - /// value of microjoules. - pub fn read_counter_uj(&self) -> Result> { - match fs::read_to_string(&self.counter_uj_path) { - Ok(result) => Ok(result), - Err(error) => Err(Box::new(error)), - } - } - pub fn read_record_uj(&self) -> Result> { +impl Socket for CPUSocket { + fn read_record_uj(&self) -> Result> { match fs::read_to_string(&self.counter_uj_path) { Ok(result) => Ok(Record::new( current_system_time_since_epoch(), @@ -675,85 +579,9 @@ impl CPUSocket { } } - /// Returns a mutable reference to the domains vector. - pub fn get_domains(&mut self) -> &mut Vec { - &mut self.domains - } - - /// Returns a immutable reference to the domains vector. - pub fn get_domains_passive(&self) -> &Vec { - &self.domains - } - - /// Returns a mutable reference to the CPU cores vector. - pub fn get_cores(&mut self) -> &mut Vec { - &mut self.cpu_cores - } - - /// Returns a immutable reference to the CPU cores vector. - pub fn get_cores_passive(&self) -> &Vec { - &self.cpu_cores - } - - /// Adds a CPU core instance to the cores vector. - pub fn add_cpu_core(&mut self, core: CPUCore) { - self.cpu_cores.push(core); - } - - /// Generates a new CPUStat object storing current usage statistics of the socket - /// and stores it in the stat_buffer. - pub fn refresh_stats(&mut self) { - if !self.stat_buffer.is_empty() { - self.clean_old_stats(); - } - self.stat_buffer.insert(0, self.read_stats().unwrap()); - } - - /// Checks the size in memory of stats_buffer and deletes as many CPUStat - /// instances from the buffer to make it smaller in memory than buffer_max_kbytes. - fn clean_old_stats(&mut self) { - let stat_ptr = &self.stat_buffer[0]; - let size_of_stat = size_of_val(stat_ptr); - let curr_size = size_of_stat * self.stat_buffer.len(); - trace!("current_size of stats in socket {}: {}", self.id, curr_size); - trace!( - "estimated max nb of socket stats: {}", - self.buffer_max_kbytes as f32 * 1000.0 / size_of_stat as f32 - ); - if curr_size > (self.buffer_max_kbytes * 1000) as usize { - let size_diff = curr_size - (self.buffer_max_kbytes * 1000) as usize; - trace!( - "socket {} size_diff: {} size of: {}", - self.id, - size_diff, - size_of_stat - ); - if size_diff > size_of_stat { - let nb_stats_to_delete = size_diff as f32 / size_of_stat as f32; - trace!( - "socket {} nb_stats_to_delete: {} size_diff: {} size of: {}", - self.id, - nb_stats_to_delete, - size_diff, - size_of_stat - ); - trace!("nb stats to delete: {}", nb_stats_to_delete as u32); - for _ in 1..nb_stats_to_delete as u32 { - if !self.stat_buffer.is_empty() { - let res = self.stat_buffer.pop(); - debug!( - "Cleaning stat buffer of socket {}, removing: {:?}", - self.id, res - ); - } - } - } - } - } - /// Combines stats from all CPU cores owned byu the socket and returns /// a CpuTime struct containing stats for the whole socket. - pub fn read_stats(&self) -> Option { + fn read_stats(&self) -> Option { let mut stats = CPUStat { user: 0, nice: 0, @@ -781,96 +609,74 @@ impl CPUSocket { Some(stats) } - /// Computes the difference between previous usage statistics record for the socket - /// and the current one. Returns a CPUStat object containing this difference, field - /// by field. - pub fn get_stats_diff(&mut self) -> Option { - if self.stat_buffer.len() > 1 { - let last = &self.stat_buffer[0]; - let previous = &self.stat_buffer[1]; - let mut iowait = None; - let mut irq = None; - let mut softirq = None; - let mut steal = None; - let mut guest = None; - let mut guest_nice = None; - if last.iowait.is_some() && previous.iowait.is_some() { - iowait = Some(last.iowait.unwrap() - previous.iowait.unwrap()); - } - if last.irq.is_some() && previous.irq.is_some() { - irq = Some(last.irq.unwrap() - previous.irq.unwrap()); - } - if last.softirq.is_some() && previous.softirq.is_some() { - softirq = Some(last.softirq.unwrap() - previous.softirq.unwrap()); - } - if last.steal.is_some() && previous.steal.is_some() { - steal = Some(last.steal.unwrap() - previous.steal.unwrap()); - } - if last.guest.is_some() && previous.guest.is_some() { - guest = Some(last.guest.unwrap() - previous.guest.unwrap()); - } - if last.guest_nice.is_some() && previous.guest_nice.is_some() { - guest_nice = Some(last.guest_nice.unwrap() - previous.guest_nice.unwrap()); - } - return Some(CPUStat { - user: last.user - previous.user, - nice: last.nice - previous.nice, - system: last.system - previous.system, - idle: last.idle - previous.idle, - iowait, - irq, - softirq, - steal, - guest, - guest_nice, - }); - } - None + fn get_id(&self) -> u16 { + self.id } - /// Returns a Record instance containing the power consumed between last - /// and previous measurement, for this CPU socket - pub fn get_records_diff_power_microwatts(&self) -> Option { - if self.record_buffer.len() > 1 { - let last_record = self.record_buffer.last().unwrap(); - let previous_record = self - .record_buffer - .get(self.record_buffer.len() - 2) - .unwrap(); - debug!( - "last_record value: {} previous_record value: {}", - &last_record.value, &previous_record.value - ); - let last_rec_val = last_record.value.trim(); - debug!("l851 : trying to parse {} as u64", last_rec_val); - let prev_rec_val = previous_record.value.trim(); - debug!("l853 : trying to parse {} as u64", prev_rec_val); - if let (Ok(last_microjoules), Ok(previous_microjoules)) = - (last_rec_val.parse::(), prev_rec_val.parse::()) - { - let mut microjoules = 0; - if last_microjoules >= previous_microjoules { - microjoules = last_microjoules - previous_microjoules; - } else { - debug!( - "previous_microjoules ({}) > last_microjoules ({})", - previous_microjoules, last_microjoules - ); - } - let time_diff = - last_record.timestamp.as_secs_f64() - previous_record.timestamp.as_secs_f64(); - let microwatts = microjoules as f64 / time_diff; - debug!("l866: microwatts: {}", microwatts); - return Some(Record::new( - last_record.timestamp, - (microwatts as u64).to_string(), - units::Unit::MicroWatt, - )); - } - } else { - debug!("Not enough records for socket"); + fn get_record_buffer(&mut self) -> &mut Vec { + &mut self.record_buffer + } + + fn get_record_buffer_passive(&self) -> &Vec { + &self.record_buffer + } + + fn get_buffer_max_kbytes(&self) -> u16 { + self.buffer_max_kbytes + } + + /// Returns a mutable reference to the domains vector. + fn get_domains(&mut self) -> &mut Vec { + &mut self.domains + } + + /// Returns a immutable reference to the domains vector. + fn get_domains_passive(&self) -> &Vec { + &self.domains + } + + /// Returns a mutable reference to the CPU cores vector. + fn get_cores(&mut self) -> &mut Vec { + &mut self.cpu_cores + } + + /// Returns a immutable reference to the CPU cores vector. + fn get_cores_passive(&self) -> &Vec { + &self.cpu_cores + } + + fn get_stat_buffer(&mut self) -> &mut Vec { + &mut self.stat_buffer + } + + fn get_stat_buffer_passive(&self) -> &Vec { + &self.stat_buffer + } + + fn get_debug_type(&self) -> String { + String::from("CPU") + } +} + +impl CPUSocket { + /// Creates and returns a CPUSocket instance with an empty buffer and no CPUCore owned yet. + fn new( + id: u16, + domains: Vec, + attributes: Vec>>, + counter_uj_path: String, + buffer_max_kbytes: u16, + ) -> CPUSocket { + CPUSocket { + id, + domains, + attributes, + counter_uj_path, + record_buffer: vec![], // buffer has to be empty first + buffer_max_kbytes, + cpu_cores: vec![], // cores are instantiated on a later step + stat_buffer: vec![], } - None } } diff --git a/src/sensors/powercap_rapl.rs b/src/sensors/powercap_rapl.rs index 208b4207..498e3add 100644 --- a/src/sensors/powercap_rapl.rs +++ b/src/sensors/powercap_rapl.rs @@ -1,5 +1,6 @@ use crate::sensors::Sensor; use crate::sensors::Topology; +use crate::sensors::CPUSocket; use procfs::{modules, KernelModule}; use regex::Regex; use std::collections::HashMap; @@ -82,13 +83,13 @@ impl Sensor for PowercapRAPLSensor { let _ = splitted.next(); let socket_id = String::from(splitted.next().unwrap()).parse().unwrap(); let domain_id = String::from(splitted.next().unwrap()).parse().unwrap(); - topo.safe_add_socket( + topo.safe_add_socket(CPUSocket::new( socket_id, vec![], vec![], format!("{}/intel-rapl:{}/energy_uj", self.base_path, socket_id), self.buffer_per_socket_max_kbytes, - ); + )); if let Ok(domain_name) = &fs::read_to_string(format!("{}/name", folder_name)) { topo.safe_add_domain_to_socket( socket_id, diff --git a/src/sensors/socket.rs b/src/sensors/socket.rs new file mode 100644 index 00000000..a1832ea4 --- /dev/null +++ b/src/sensors/socket.rs @@ -0,0 +1,254 @@ +use core::fmt::Debug; +use std::error::Error; +use std::mem::size_of_val; +use dyn_clone::DynClone; + +use crate::sensors::units; +use super::{Record, Domain, CPUStat, CPUCore, RecordGenerator, StatsGenerator}; + +pub trait Socket: DynClone + Send { + fn read_record_uj(&self) -> Result>; + fn get_record_buffer(&mut self) -> &mut Vec; + fn get_record_buffer_passive(&self) -> &Vec; + fn get_buffer_max_kbytes(&self) -> u16; + fn get_id(&self) -> u16; + fn get_domains_passive(&self) -> &Vec; + fn get_domains(&mut self) -> &mut Vec; + fn get_stat_buffer(&mut self) -> &mut Vec; + fn get_stat_buffer_passive(&self) -> &Vec; + fn read_stats(&self) -> Option; + fn get_cores(&mut self) -> &mut Vec; + fn get_cores_passive(&self) -> &Vec; + fn get_debug_type(&self) -> String; +} + +dyn_clone::clone_trait_object!(Socket); + +impl dyn Socket { + pub fn add_cpu_core(&mut self, core: CPUCore) { + self.get_cores().push(core); + } + + /// Adds a new Domain instance to the domains vector if and only if it doesn't exist in the vector already. + pub fn safe_add_domain(&mut self, domain: Domain) { + let domains = self.get_domains(); + if !domains.iter().any(|d| d.id == domain.id) { + domains.push(domain); + } + } + + /// Returns a Record instance containing the power consumed between last + /// and previous measurement, for this CPU socket + pub fn get_records_diff_power_microwatts(&self) -> Option { + let record_buffer = self.get_record_buffer_passive(); + if record_buffer.len() > 1 { + let last_record = record_buffer.last().unwrap(); + let previous_record = record_buffer + .get(record_buffer.len() - 2) + .unwrap(); + debug!( + "last_record value: {} previous_record value: {}", + &last_record.value, &previous_record.value + ); + let last_rec_val = last_record.value.trim(); + debug!("l851 : trying to parse {} as u64", last_rec_val); + let prev_rec_val = previous_record.value.trim(); + debug!("l853 : trying to parse {} as u64", prev_rec_val); + if let (Ok(last_microjoules), Ok(previous_microjoules)) = + (last_rec_val.parse::(), prev_rec_val.parse::()) + { + let mut microjoules = 0; + if last_microjoules >= previous_microjoules { + microjoules = last_microjoules - previous_microjoules; + } else { + debug!( + "previous_microjoules ({}) > last_microjoules ({})", + previous_microjoules, last_microjoules + ); + } + let time_diff = + last_record.timestamp.as_secs_f64() - previous_record.timestamp.as_secs_f64(); + let microwatts = microjoules as f64 / time_diff; + debug!("l866: microwatts: {}", microwatts); + return Some(Record::new( + last_record.timestamp, + (microwatts as u64).to_string(), + units::Unit::MicroWatt, + )); + } + } else { + debug!("Not enough records for socket"); + } + None + } +} + +impl RecordGenerator for dyn Socket { + fn refresh_record(&mut self) { + if let Ok(record) = self.read_record_uj() { + self.get_record_buffer().push(record); + } + + if !self.get_record_buffer().is_empty() { + self.clean_old_records(); + } + } + + fn clean_old_records(&mut self) { + let buffer_max_kbytes = self.get_buffer_max_kbytes(); + let id = self.get_id(); + let record_buffer = self.get_record_buffer(); + let record_ptr = &record_buffer[0]; + let curr_size = size_of_val(record_ptr) * record_buffer.len(); + trace!( + "socket rebord buffer current size: {} max_bytes: {}", + curr_size, + buffer_max_kbytes * 1000 + ); + if curr_size > (buffer_max_kbytes * 1000) as usize { + let size_diff = curr_size - (buffer_max_kbytes * 1000) as usize; + trace!( + "socket record size_diff: {} sizeof: {}", + size_diff, + size_of_val(record_ptr) + ); + if size_diff > size_of_val(record_ptr) { + let nb_records_to_delete = size_diff as f32 / size_of_val(record_ptr) as f32; + for _ in 1..nb_records_to_delete as u32 { + if !record_buffer.is_empty() { + let res =record_buffer.remove(0); + debug!( + "Cleaning socket id {} records buffer, removing: {}", + id, res + ); + } + } + } + } + } + + fn get_records_passive(&self) -> Vec { + let mut result = vec![]; + for r in self.get_record_buffer_passive() { + result.push(Record::new( + r.timestamp, + r.value.clone(), + units::Unit::MicroJoule, + )); + } + result + } +} + +impl StatsGenerator for dyn Socket { + /// Generates a new CPUStat object storing current usage statistics of the socket + /// and stores it in the stat_buffer. + fn refresh_stats(&mut self) { + let stat_buffer = self.get_stat_buffer_passive(); + if !stat_buffer.is_empty() { + self.clean_old_stats(); + } + let stats = self.read_stats(); + self.get_stat_buffer().insert(0, stats.unwrap()); + } + + /// Checks the size in memory of stats_buffer and deletes as many CPUStat + /// instances from the buffer to make it smaller in memory than buffer_max_kbytes. + fn clean_old_stats(&mut self) { + let id = self.get_id(); + let buffer_max_kbytes = self.get_buffer_max_kbytes(); + let stat_buffer = self.get_stat_buffer(); + let stat_ptr = &stat_buffer[0]; + let size_of_stat = size_of_val(stat_ptr); + let curr_size = size_of_stat * stat_buffer.len(); + trace!("current_size of stats in socket {}: {}", id, curr_size); + trace!( + "estimated max nb of socket stats: {}", + buffer_max_kbytes as f32 * 1000.0 / size_of_stat as f32 + ); + if curr_size > (buffer_max_kbytes * 1000) as usize { + let size_diff = curr_size - (buffer_max_kbytes * 1000) as usize; + trace!( + "socket {} size_diff: {} size of: {}", + id, + size_diff, + size_of_stat + ); + if size_diff > size_of_stat { + let nb_stats_to_delete = size_diff as f32 / size_of_stat as f32; + trace!( + "socket {} nb_stats_to_delete: {} size_diff: {} size of: {}", + id, + nb_stats_to_delete, + size_diff, + size_of_stat + ); + trace!("nb stats to delete: {}", nb_stats_to_delete as u32); + for _ in 1..nb_stats_to_delete as u32 { + if !stat_buffer.is_empty() { + let res = stat_buffer.pop(); + debug!( + "Cleaning stat buffer of socket {}, removing: {:?}", + id, res + ); + } + } + } + } + } + + + /// Computes the difference between previous usage statistics record for the socket + /// and the current one. Returns a CPUStat object containing this difference, field + /// by field. + fn get_stats_diff(&mut self) -> Option { + let stat_buffer = self.get_stat_buffer(); + if stat_buffer.len() > 1 { + let last = &stat_buffer[0]; + let previous = &stat_buffer[1]; + let mut iowait = None; + let mut irq = None; + let mut softirq = None; + let mut steal = None; + let mut guest = None; + let mut guest_nice = None; + if last.iowait.is_some() && previous.iowait.is_some() { + iowait = Some(last.iowait.unwrap() - previous.iowait.unwrap()); + } + if last.irq.is_some() && previous.irq.is_some() { + irq = Some(last.irq.unwrap() - previous.irq.unwrap()); + } + if last.softirq.is_some() && previous.softirq.is_some() { + softirq = Some(last.softirq.unwrap() - previous.softirq.unwrap()); + } + if last.steal.is_some() && previous.steal.is_some() { + steal = Some(last.steal.unwrap() - previous.steal.unwrap()); + } + if last.guest.is_some() && previous.guest.is_some() { + guest = Some(last.guest.unwrap() - previous.guest.unwrap()); + } + if last.guest_nice.is_some() && previous.guest_nice.is_some() { + guest_nice = Some(last.guest_nice.unwrap() - previous.guest_nice.unwrap()); + } + return Some(CPUStat { + user: last.user - previous.user, + nice: last.nice - previous.nice, + system: last.system - previous.system, + idle: last.idle - previous.idle, + iowait, + irq, + softirq, + steal, + guest, + guest_nice, + }); + } + None + } +} + +impl Debug for dyn Socket { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "{} Socket (id={})", self.get_debug_type(), self.get_id()) + } +} \ No newline at end of file