Skip to content

Commit 1f842a5

Browse files
committed
feat(datadog): use ureq to send metrics to datadog
Signed-off-by: Jérémie Drouet <[email protected]>
1 parent 7747f75 commit 1f842a5

File tree

4 files changed

+218
-39
lines changed

4 files changed

+218
-39
lines changed

Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ readme = "README.md"
1313

1414
[features]
1515
default = ["datadog"]
16-
datadog = ["datadog-client"]
16+
datadog = ["ureq"]
1717

1818
[dependencies]
1919
loggerv = "0.7.2"
@@ -29,7 +29,7 @@ protobuf = "2.20.0"
2929
serde = { version = "1.0", features = ["derive"] }
3030
serde_json = "1.0"
3131

32-
datadog-client = { version = "0.1", optional = true }
32+
ureq = { version = "2.0.2", features = ["json"], optional = true }
3333

3434
[profile.release]
3535
lto = true

src/exporters/datadog.rs

+176-16
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,178 @@
11
use crate::exporters::*;
22
use crate::sensors::{Sensor, Topology};
3-
use datadog_client::client::{Client, Config};
4-
use datadog_client::metrics::{Point, Serie, Type};
3+
use serde::ser::SerializeSeq;
4+
use serde::{Serialize, Serializer};
55
use std::collections::HashMap;
66
use std::thread;
77
use std::time::{Duration, Instant};
88

9+
#[derive(Clone, Debug)]
10+
pub enum Type {
11+
Count,
12+
Gauge,
13+
Rate,
14+
}
15+
16+
impl Type {
17+
pub fn as_str(&self) -> &str {
18+
match self {
19+
Self::Count => "count",
20+
Self::Gauge => "gauge",
21+
Self::Rate => "rate",
22+
}
23+
}
24+
}
25+
26+
impl Serialize for Type {
27+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
28+
where
29+
S: Serializer,
30+
{
31+
serializer.serialize_str(self.as_str())
32+
}
33+
}
34+
35+
#[derive(Clone, Debug)]
36+
pub struct Point {
37+
timestamp: u64,
38+
value: f64,
39+
}
40+
41+
impl Point {
42+
pub fn new(timestamp: u64, value: f64) -> Self {
43+
Self { timestamp, value }
44+
}
45+
}
46+
47+
impl Serialize for Point {
48+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
49+
where
50+
S: Serializer,
51+
{
52+
let mut seq = serializer.serialize_seq(Some(2))?;
53+
seq.serialize_element(&self.timestamp)?;
54+
seq.serialize_element(&self.value)?;
55+
seq.end()
56+
}
57+
}
58+
59+
/// # Examples
60+
///
61+
/// ```
62+
/// use datadog_client::metrics::{Point, Serie, Type};
63+
///
64+
/// let serie = Serie::new("cpu.usage", Type::Gauge)
65+
/// .set_host("raspberrypi")
66+
/// .set_interval(42)
67+
/// .set_points(vec![])
68+
/// .add_point(Point::new(123456, 12.34))
69+
/// .set_tags(vec![])
70+
/// .add_tag(String::from("whatever:tag"));
71+
/// ```
72+
#[derive(Debug, Clone, Serialize)]
73+
pub struct Serie {
74+
// The name of the host that produced the metric.
75+
#[serde(skip_serializing_if = "Option::is_none")]
76+
host: Option<String>,
77+
// If the type of the metric is rate or count, define the corresponding interval.
78+
#[serde(skip_serializing_if = "Option::is_none")]
79+
interval: Option<i64>,
80+
// The name of the timeseries.
81+
metric: String,
82+
// Points relating to a metric. All points must be tuples with timestamp and a scalar value (cannot be a string).
83+
// Timestamps should be in POSIX time in seconds, and cannot be more than ten minutes in the future or more than one hour in the past.
84+
points: Vec<Point>,
85+
// A list of tags associated with the metric.
86+
tags: Vec<String>,
87+
// The type of the metric either count, gauge, or rate.
88+
#[serde(rename = "type")]
89+
dtype: Type,
90+
}
91+
92+
impl Serie {
93+
pub fn new(metric: &str, dtype: Type) -> Self {
94+
Self {
95+
host: None,
96+
interval: None,
97+
metric: metric.to_string(),
98+
points: Vec::new(),
99+
tags: Vec::new(),
100+
dtype,
101+
}
102+
}
103+
}
104+
105+
impl Serie {
106+
pub fn set_host(mut self, host: &str) -> Self {
107+
self.host = Some(host.to_string());
108+
self
109+
}
110+
111+
pub fn set_interval(mut self, interval: i64) -> Self {
112+
self.interval = Some(interval);
113+
self
114+
}
115+
116+
pub fn set_points(mut self, points: Vec<Point>) -> Self {
117+
self.points = points;
118+
self
119+
}
120+
121+
pub fn add_point(mut self, point: Point) -> Self {
122+
self.points.push(point);
123+
self
124+
}
125+
}
126+
127+
impl Serie {
128+
pub fn set_tags(mut self, tags: Vec<String>) -> Self {
129+
self.tags = tags;
130+
self
131+
}
132+
133+
pub fn add_tag(mut self, tag: String) -> Self {
134+
self.tags.push(tag);
135+
self
136+
}
137+
}
138+
139+
struct Client {
140+
host: String,
141+
api_key: String,
142+
}
143+
144+
impl Client {
145+
pub fn new(parameters: &ArgMatches) -> Self {
146+
Self {
147+
host: parameters.value_of("host").unwrap().to_string(),
148+
api_key: parameters.value_of("api_key").unwrap().to_string(),
149+
}
150+
}
151+
152+
pub fn send(&self, series: &[Serie]) {
153+
let url = format!("{}/api/v1/series", self.host);
154+
let request = ureq::post(url.as_str())
155+
.set("DD-API-KEY", self.api_key.as_str())
156+
.send_json(serde_json::json!({ "series": series }));
157+
match request {
158+
Ok(response) => {
159+
if response.status() >= 400 {
160+
log::warn!(
161+
"couldn't send metrics to datadog: status {}",
162+
response.status_text()
163+
);
164+
if let Ok(body) = response.into_string() {
165+
log::warn!("response from server: {}", body);
166+
}
167+
} else {
168+
log::info!("metrics sent with success");
169+
}
170+
}
171+
Err(err) => log::warn!("error while sending metrics: {}", err),
172+
};
173+
}
174+
}
175+
9176
fn merge<A>(first: Vec<A>, second: Vec<A>) -> Vec<A> {
10177
second.into_iter().fold(first, |mut res, item| {
11178
res.push(item);
@@ -79,15 +246,8 @@ impl DatadogExporter {
79246
}
80247
}
81248

82-
fn build_client(parameters: &ArgMatches) -> Client {
83-
let config = Config::new(
84-
parameters.value_of("host").unwrap().to_string(),
85-
parameters.value_of("api_key").unwrap().to_string(),
86-
);
87-
Client::new(config)
88-
}
89-
90-
fn runner(&mut self, parameters: &ArgMatches) {
249+
fn runner(&mut self, parameters: &ArgMatches<'_>) {
250+
let client = Client::new(parameters);
91251
if let Some(timeout) = parameters.value_of("timeout") {
92252
let now = Instant::now();
93253
let timeout = timeout
@@ -110,18 +270,18 @@ impl DatadogExporter {
110270
info!("Measurement step is: {}s", step_duration);
111271

112272
while now.elapsed().as_secs() <= timeout {
113-
self.iterate(parameters);
273+
self.iterate(&client);
114274
thread::sleep(Duration::new(step_duration, step_duration_nano));
115275
}
116276
} else {
117-
self.iterate(parameters);
277+
self.iterate(&client);
118278
}
119279
}
120280

121-
fn iterate(&mut self, parameters: &ArgMatches) {
281+
fn iterate(&mut self, client: &Client) {
122282
self.topology.refresh();
123-
let _series = self.collect_series();
124-
let _client = Self::build_client(parameters);
283+
let series = self.collect_series();
284+
client.send(&series);
125285
}
126286

127287
fn create_consumption_serie(&self) -> Serie {

src/lib.rs

+38-21
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ pub mod exporters;
55
pub mod sensors;
66
use clap::ArgMatches;
77
use exporters::{
8-
json::JSONExporter, prometheus::PrometheusExporter, qemu::QemuExporter,
9-
riemann::RiemannExporter, stdout::StdoutExporter, Exporter, ExporterOption,
8+
datadog::DatadogExporter, json::JSONExporter, prometheus::PrometheusExporter,
9+
qemu::QemuExporter, riemann::RiemannExporter, stdout::StdoutExporter, Exporter, ExporterOption,
1010
};
1111
use sensors::{powercap_rapl::PowercapRAPLSensor, Sensor};
1212
use std::collections::HashMap;
@@ -52,32 +52,44 @@ fn get_sensor(matches: &ArgMatches) -> Box<dyn Sensor> {
5252
pub fn run(matches: ArgMatches) {
5353
loggerv::init_with_verbosity(matches.occurrences_of("v")).unwrap();
5454

55-
let sensor_boxed = get_sensor(&matches);
56-
let exporter_parameters;
57-
5855
if let Some(stdout_exporter_parameters) = matches.subcommand_matches("stdout") {
59-
exporter_parameters = stdout_exporter_parameters.clone();
60-
let mut exporter = StdoutExporter::new(sensor_boxed);
56+
let exporter_parameters = stdout_exporter_parameters.clone();
57+
let mut exporter = StdoutExporter::new(get_sensor(&matches));
6158
exporter.run(exporter_parameters);
62-
} else if let Some(json_exporter_parameters) = matches.subcommand_matches("json") {
63-
exporter_parameters = json_exporter_parameters.clone();
64-
let mut exporter = JSONExporter::new(sensor_boxed);
59+
return;
60+
}
61+
if let Some(json_exporter_parameters) = matches.subcommand_matches("json") {
62+
let exporter_parameters = json_exporter_parameters.clone();
63+
let mut exporter = JSONExporter::new(get_sensor(&matches));
6564
exporter.run(exporter_parameters);
66-
} else if let Some(riemann_exporter_parameters) = matches.subcommand_matches("riemann") {
67-
exporter_parameters = riemann_exporter_parameters.clone();
68-
let mut exporter = RiemannExporter::new(sensor_boxed);
65+
return;
66+
}
67+
if let Some(riemann_exporter_parameters) = matches.subcommand_matches("riemann") {
68+
let exporter_parameters = riemann_exporter_parameters.clone();
69+
let mut exporter = RiemannExporter::new(get_sensor(&matches));
6970
exporter.run(exporter_parameters);
70-
} else if let Some(prometheus_exporter_parameters) = matches.subcommand_matches("prometheus") {
71-
exporter_parameters = prometheus_exporter_parameters.clone();
72-
let mut exporter = PrometheusExporter::new(sensor_boxed);
71+
return;
72+
}
73+
if let Some(prometheus_exporter_parameters) = matches.subcommand_matches("prometheus") {
74+
let exporter_parameters = prometheus_exporter_parameters.clone();
75+
let mut exporter = PrometheusExporter::new(get_sensor(&matches));
7376
exporter.run(exporter_parameters);
74-
} else if let Some(qemu_exporter_parameters) = matches.subcommand_matches("qemu") {
75-
exporter_parameters = qemu_exporter_parameters.clone();
76-
let mut exporter = QemuExporter::new(sensor_boxed);
77+
return;
78+
}
79+
if let Some(qemu_exporter_parameters) = matches.subcommand_matches("qemu") {
80+
let exporter_parameters = qemu_exporter_parameters.clone();
81+
let mut exporter = QemuExporter::new(get_sensor(&matches));
7782
exporter.run(exporter_parameters);
78-
} else {
79-
error!("Couldn't determine which exporter has been chosen.");
83+
return;
8084
}
85+
#[cfg(feature = "datadog")]
86+
if let Some(datadog_exporter_parameters) = matches.subcommand_matches("datadog") {
87+
let exporter_parameters = datadog_exporter_parameters.clone();
88+
let mut exporter = DatadogExporter::new(get_sensor(&matches));
89+
exporter.run(exporter_parameters);
90+
return;
91+
}
92+
error!("Couldn't determine which exporter has been chosen.");
8193
}
8294

8395
/// Returns options needed for each exporter as a HashMap.
@@ -104,6 +116,11 @@ pub fn get_exporters_options() -> HashMap<String, HashMap<String, ExporterOption
104116
String::from("qemu"),
105117
exporters::qemu::QemuExporter::get_options(),
106118
);
119+
#[cfg(feature = "datadog")]
120+
options.insert(
121+
String::from("datadog"),
122+
exporters::datadog::DatadogExporter::get_options(),
123+
);
107124
options
108125
}
109126

src/main.rs

+2
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ fn main() {
6363
"prometheus" => "Prometheus exporter exposes power consumption metrics on an http endpoint (/metrics is default) in prometheus accepted format",
6464
"riemann" => "Riemann exporter sends power consumption metrics to a Riemann server",
6565
"qemu" => "Qemu exporter watches all Qemu/KVM virtual machines running on the host and exposes metrics of each of them in a dedicated folder",
66+
#[cfg(feature = "datadog")]
67+
"datadog" => "Datadog exporter sends power consumption metrics to Datadog",
6668
_ => "Unknown exporter",
6769
}
6870
);

0 commit comments

Comments
 (0)