Skip to content

Commit

Permalink
feat(datadog): use ureq to send metrics to datadog
Browse files Browse the repository at this point in the history
Signed-off-by: Jérémie Drouet <[email protected]>
  • Loading branch information
jdrouet committed Mar 11, 2021
1 parent cf9a628 commit 07b0533
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 44 deletions.
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ readme = "README.md"

[features]
default = ["datadog"]
datadog = ["datadog-client"]
datadog = ["ureq"]

[dependencies]
loggerv = "0.7.2"
Expand All @@ -22,7 +22,6 @@ clap = "2.33.3"
regex = "1"
procfs = "0.8.1"
actix-web = "3"
futures = "0.3"
riemann_client = "0.9.0"
hostname = "0.3.1"
protobuf = "2.20.0"
Expand All @@ -31,7 +30,7 @@ serde_json = "1.0"
warp10 = "1.0.0"
time = "0.2.25"

datadog-client = { version = "0.1", optional = true }
ureq = { version = "2.0.2", features = ["json"], optional = true }

[profile.release]
lto = true
Expand Down
179 changes: 163 additions & 16 deletions src/exporters/datadog.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,165 @@
use crate::exporters::*;
use crate::sensors::{Sensor, Topology};
use datadog_client::client::{Client, Config};
use datadog_client::metrics::{Point, Serie, Type};
use serde::ser::SerializeSeq;
use serde::{Serialize, Serializer};
use std::collections::HashMap;
use std::thread;
use std::time::{Duration, Instant};

#[derive(Clone, Debug)]
pub enum Type {
Count,
Gauge,
Rate,
}

impl Type {
pub fn as_str(&self) -> &str {
match self {
Self::Count => "count",
Self::Gauge => "gauge",
Self::Rate => "rate",
}
}
}

impl Serialize for Type {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(self.as_str())
}
}

#[derive(Clone, Debug)]
pub struct Point {
timestamp: u64,
value: f64,
}

impl Point {
pub fn new(timestamp: u64, value: f64) -> Self {
Self { timestamp, value }
}
}

impl Serialize for Point {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut seq = serializer.serialize_seq(Some(2))?;
seq.serialize_element(&self.timestamp)?;
seq.serialize_element(&self.value)?;
seq.end()
}
}

#[derive(Debug, Clone, Serialize)]
pub struct Serie {
// The name of the host that produced the metric.
#[serde(skip_serializing_if = "Option::is_none")]
host: Option<String>,
// If the type of the metric is rate or count, define the corresponding interval.
#[serde(skip_serializing_if = "Option::is_none")]
interval: Option<i64>,
// The name of the timeseries.
metric: String,
// Points relating to a metric. All points must be tuples with timestamp and a scalar value (cannot be a string).
// Timestamps should be in POSIX time in seconds, and cannot be more than ten minutes in the future or more than one hour in the past.
points: Vec<Point>,
// A list of tags associated with the metric.
tags: Vec<String>,
// The type of the metric either count, gauge, or rate.
#[serde(rename = "type")]
dtype: Type,
}

impl Serie {
pub fn new(metric: &str, dtype: Type) -> Self {
Self {
host: None,
interval: None,
metric: metric.to_string(),
points: Vec::new(),
tags: Vec::new(),
dtype,
}
}
}

impl Serie {
pub fn set_host(mut self, host: &str) -> Self {
self.host = Some(host.to_string());
self
}

pub fn set_interval(mut self, interval: i64) -> Self {
self.interval = Some(interval);
self
}

pub fn set_points(mut self, points: Vec<Point>) -> Self {
self.points = points;
self
}

pub fn add_point(mut self, point: Point) -> Self {
self.points.push(point);
self
}
}

impl Serie {
pub fn set_tags(mut self, tags: Vec<String>) -> Self {
self.tags = tags;
self
}

pub fn add_tag(mut self, tag: String) -> Self {
self.tags.push(tag);
self
}
}

struct Client {
host: String,
api_key: String,
}

impl Client {
pub fn new(parameters: &ArgMatches) -> Self {
Self {
host: parameters.value_of("host").unwrap().to_string(),
api_key: parameters.value_of("api_key").unwrap().to_string(),
}
}

pub fn send(&self, series: &[Serie]) {
let url = format!("{}/api/v1/series", self.host);
let request = ureq::post(url.as_str())
.set("DD-API-KEY", self.api_key.as_str())
.send_json(serde_json::json!({ "series": series }));
match request {
Ok(response) => {
if response.status() >= 400 {
log::warn!(
"couldn't send metrics to datadog: status {}",
response.status_text()
);
if let Ok(body) = response.into_string() {
log::warn!("response from server: {}", body);
}
} else {
log::info!("metrics sent with success");
}
}
Err(err) => log::warn!("error while sending metrics: {}", err),
};
}
}

fn merge<A>(first: Vec<A>, second: Vec<A>) -> Vec<A> {
second.into_iter().fold(first, |mut res, item| {
res.push(item);
Expand Down Expand Up @@ -79,15 +233,8 @@ impl DatadogExporter {
}
}

fn build_client(parameters: &ArgMatches) -> Client {
let config = Config::new(
parameters.value_of("host").unwrap().to_string(),
parameters.value_of("api_key").unwrap().to_string(),
);
Client::new(config)
}

fn runner(&mut self, parameters: &ArgMatches) {
fn runner(&mut self, parameters: &ArgMatches<'_>) {
let client = Client::new(parameters);
if let Some(timeout) = parameters.value_of("timeout") {
let now = Instant::now();
let timeout = timeout
Expand All @@ -110,18 +257,18 @@ impl DatadogExporter {
info!("Measurement step is: {}s", step_duration);

while now.elapsed().as_secs() <= timeout {
self.iterate(parameters);
self.iterate(&client);
thread::sleep(Duration::new(step_duration, step_duration_nano));
}
} else {
self.iterate(parameters);
self.iterate(&client);
}
}

fn iterate(&mut self, parameters: &ArgMatches) {
fn iterate(&mut self, client: &Client) {
self.topology.refresh();
let _series = self.collect_series();
let _client = Self::build_client(parameters);
let series = self.collect_series();
client.send(&series);
}

fn create_consumption_serie(&self) -> Serie {
Expand Down
69 changes: 44 additions & 25 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ pub mod exporters;
pub mod sensors;
use clap::ArgMatches;
use exporters::{
json::JSONExporter, prometheus::PrometheusExporter, qemu::QemuExporter,
riemann::RiemannExporter, stdout::StdoutExporter, warpten::Warp10Exporter, Exporter,
ExporterOption,
datadog::DatadogExporter, json::JSONExporter, prometheus::PrometheusExporter,
qemu::QemuExporter, riemann::RiemannExporter, stdout::StdoutExporter, warpten::Warp10Exporter,
Exporter, ExporterOption,
};
use sensors::{powercap_rapl::PowercapRAPLSensor, Sensor};
use std::collections::HashMap;
Expand Down Expand Up @@ -53,36 +53,50 @@ fn get_sensor(matches: &ArgMatches) -> Box<dyn Sensor> {
pub fn run(matches: ArgMatches) {
loggerv::init_with_verbosity(matches.occurrences_of("v")).unwrap();

let sensor_boxed = get_sensor(&matches);
let exporter_parameters;

if let Some(stdout_exporter_parameters) = matches.subcommand_matches("stdout") {
exporter_parameters = stdout_exporter_parameters.clone();
let mut exporter = StdoutExporter::new(sensor_boxed);
let exporter_parameters = stdout_exporter_parameters.clone();
let mut exporter = StdoutExporter::new(get_sensor(&matches));
exporter.run(exporter_parameters);
} else if let Some(json_exporter_parameters) = matches.subcommand_matches("json") {
exporter_parameters = json_exporter_parameters.clone();
let mut exporter = JSONExporter::new(sensor_boxed);
return;
}
if let Some(json_exporter_parameters) = matches.subcommand_matches("json") {
let exporter_parameters = json_exporter_parameters.clone();
let mut exporter = JSONExporter::new(get_sensor(&matches));
exporter.run(exporter_parameters);
} else if let Some(riemann_exporter_parameters) = matches.subcommand_matches("riemann") {
exporter_parameters = riemann_exporter_parameters.clone();
let mut exporter = RiemannExporter::new(sensor_boxed);
return;
}
if let Some(riemann_exporter_parameters) = matches.subcommand_matches("riemann") {
let exporter_parameters = riemann_exporter_parameters.clone();
let mut exporter = RiemannExporter::new(get_sensor(&matches));
exporter.run(exporter_parameters);
return;
}
if let Some(prometheus_exporter_parameters) = matches.subcommand_matches("prometheus") {
let exporter_parameters = prometheus_exporter_parameters.clone();
let mut exporter = PrometheusExporter::new(get_sensor(&matches));
exporter.run(exporter_parameters);
} else if let Some(prometheus_exporter_parameters) = matches.subcommand_matches("prometheus") {
exporter_parameters = prometheus_exporter_parameters.clone();
let mut exporter = PrometheusExporter::new(sensor_boxed);
return;
}
if let Some(qemu_exporter_parameters) = matches.subcommand_matches("qemu") {
let exporter_parameters = qemu_exporter_parameters.clone();
let mut exporter = QemuExporter::new(get_sensor(&matches));
exporter.run(exporter_parameters);
} else if let Some(qemu_exporter_parameters) = matches.subcommand_matches("qemu") {
exporter_parameters = qemu_exporter_parameters.clone();
let mut exporter = QemuExporter::new(sensor_boxed);
return;
}
if let Some(warp10_exporter_parameters) = matches.subcommand_matches("warp10") {
let exporter_parameters = warp10_exporter_parameters.clone();
let mut exporter = Warp10Exporter::new(get_sensor(&matches));
exporter.run(exporter_parameters);
} else if let Some(warp10_exporter_parameters) = matches.subcommand_matches("warp10") {
exporter_parameters = warp10_exporter_parameters.clone();
let mut exporter = Warp10Exporter::new(sensor_boxed);
return;
}
#[cfg(feature = "datadog")]
if let Some(datadog_exporter_parameters) = matches.subcommand_matches("datadog") {
let exporter_parameters = datadog_exporter_parameters.clone();
let mut exporter = DatadogExporter::new(get_sensor(&matches));
exporter.run(exporter_parameters);
} else {
error!("Couldn't determine which exporter has been chosen.");
return;
}
error!("Couldn't determine which exporter has been chosen.");
}

/// Returns options needed for each exporter as a HashMap.
Expand Down Expand Up @@ -113,6 +127,11 @@ pub fn get_exporters_options() -> HashMap<String, HashMap<String, ExporterOption
String::from("warp10"),
exporters::warpten::Warp10Exporter::get_options(),
);
#[cfg(feature = "datadog")]
options.insert(
String::from("datadog"),
exporters::datadog::DatadogExporter::get_options(),
);
options
}

Expand Down
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ fn main() {
"riemann" => "Riemann exporter sends power consumption metrics to a Riemann server",
"qemu" => "Qemu exporter watches all Qemu/KVM virtual machines running on the host and exposes metrics of each of them in a dedicated folder",
"warp10" => "Warp10 exporter sends data to a Warp10 host, through HTTP",
#[cfg(feature = "datadog")]
"datadog" => "Datadog exporter sends power consumption metrics to Datadog",
_ => "Unknown exporter",
}
);
Expand Down

0 comments on commit 07b0533

Please sign in to comment.