Skip to content

Commit dabe5c7

Browse files
15skumarShivangi Kumar
andcommitted
Otlp exporter (#1464)
This PR adds an implementation of OpenTelemetry Exporting of metrics through the OpenTelemetry protocol (OTLP). Changes are: a new OtlpMetricsExporter struct which handles exporting metrics to an OTLP endpoint, and integration of the OTLP exporter with the existing metrics system. Testing: I tested the implementation with a test otlp_metrics() in metrics.rs and ran a docker container running the OpenTelemetry Collector at the default port docker run -d --name otel-collector \ -p 4318:4318 -p 4317:4317 \ -v $(pwd)/collector-config.yaml:/etc/otelcol/config.yaml \ otel/opentelemetry-collector-contrib:latest Once I ran the test, I verified that the test metrics can be viewed in the collector logs. (viewed using 'docker logs otel-collector'). Here is a screenshot of an example of a test metric collected at the endpoint: <img width="391" alt="Screenshot 2025-06-18 at 15 32 16" src="https://github.com/user-attachments/assets/aab7e20a-0472-495b-af1d-23e966495e21" /> --- By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the [Developer Certificate of Origin (DCO)](https://developercertificate.org/). --------- Signed-off-by: Shivangi Kumar <[email protected]> Co-authored-by: Shivangi Kumar <[email protected]>
1 parent 0d8312a commit dabe5c7

File tree

9 files changed

+719
-16
lines changed

9 files changed

+719
-16
lines changed

Cargo.lock

Lines changed: 288 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mountpoint-s3-fs/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,17 @@ time = { version = "0.3.41", features = ["macros", "formatting", "serde-well-kno
5050
tracing = { version = "0.1.41", features = ["log"] }
5151
tracing-log = "0.2.0"
5252
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
53+
opentelemetry = { version = "0.30.0", features = ["metrics"] }
54+
opentelemetry_sdk = { version = "0.30.0", features = ["metrics", "rt-tokio"] }
55+
opentelemetry-otlp = { version = "0.30.0", features = ["metrics", "http-proto"] }
5356

5457
[target.'cfg(target_os = "linux")'.dependencies]
5558
procfs = { version = "0.17.0", default-features = false }
5659

5760
[dev-dependencies]
5861
mountpoint-s3-client = { path = "../mountpoint-s3-client", features = ["mock"] }
62+
opentelemetry-proto = { version = "0.30.0", features = ["metrics", "gen-tonic"] }
63+
prost = "0.12.0"
5964

6065
assert_cmd = "2.0.17"
6166
assert_fs = "1.1.3"

mountpoint-s3-fs/examples/mount_from_config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ fn process_manifests(config: &ConfigOptions, database_directory: &Path) -> Resul
193193

194194
fn setup_logging(config: &ConfigOptions) -> Result<(LoggingHandle, MetricsSinkHandle)> {
195195
let logging = init_logging(config.build_logging_config())?;
196-
let metrics = metrics::install();
196+
let metrics = metrics::install(None).map_err(|e| anyhow!("Failed to initialize metrics: {}", e))?;
197197
Ok((logging, metrics))
198198
}
199199

mountpoint-s3-fs/examples/prefetch_benchmark.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ impl CliArgs {
159159

160160
fn main() -> anyhow::Result<()> {
161161
init_tracing_subscriber();
162-
let _metrics_handle = mountpoint_s3_fs::metrics::install();
162+
let _metrics_handle = mountpoint_s3_fs::metrics::install(None);
163163

164164
let args = CliArgs::parse();
165165

mountpoint-s3-fs/examples/upload_benchmark.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ struct UploadBenchmarkArgs {
9090

9191
fn main() {
9292
init_tracing_subscriber();
93-
let _metrics_handle = mountpoint_s3_fs::metrics::install();
93+
let _metrics_handle = mountpoint_s3_fs::metrics::install(None);
9494

9595
let args = UploadBenchmarkArgs::parse();
9696
println!("starting upload benchmark with {:?}", &args);

mountpoint-s3-fs/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub mod mem_limiter;
1414
pub mod memory;
1515
pub mod metablock;
1616
pub mod metrics;
17+
pub mod metrics_otel;
1718
pub mod object;
1819
pub mod prefetch;
1920
pub mod prefix;

mountpoint-s3-fs/src/metrics.rs

Lines changed: 207 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
//! This module hooks up the [metrics](https://docs.rs/metrics) facade to a metrics sink that
44
//! currently just emits them to a tracing log entry.
55
6+
use crate::metrics_otel::{OtlpConfig, OtlpMetricsExporter};
7+
use opentelemetry::KeyValue;
8+
69
use std::thread::{self, JoinHandle};
710
use std::time::Duration;
811

@@ -30,8 +33,8 @@ pub const TARGET_NAME: &str = "mountpoint_s3_fs::metrics";
3033
/// done with their work; metrics generated after shutting down the sink will be lost.
3134
///
3235
/// Panics if a sink has already been installed.
33-
pub fn install() -> MetricsSinkHandle {
34-
let sink = Arc::new(MetricsSink::new());
36+
pub fn install(otlp_config: Option<OtlpConfig>) -> anyhow::Result<MetricsSinkHandle> {
37+
let sink = Arc::new(MetricsSink::new(otlp_config)?);
3538
let mut sys = System::new();
3639

3740
let (tx, rx) = channel();
@@ -62,9 +65,10 @@ pub fn install() -> MetricsSinkHandle {
6265
};
6366

6467
let recorder = MetricsRecorder { sink };
65-
metrics::set_global_recorder(recorder).unwrap();
68+
metrics::set_global_recorder(recorder)
69+
.map_err(|e| anyhow::anyhow!("Failed to set global metrics recorder: {}", e))?;
6670

67-
handle
71+
Ok(handle)
6872
}
6973

7074
/// Report process level metrics
@@ -90,13 +94,46 @@ fn poll_process_metrics(sys: &mut System) {
9094
#[derive(Debug)]
9195
struct MetricsSink {
9296
metrics: DashMap<Key, Metric>,
97+
otlp_exporter: Option<OtlpMetricsExporter>,
9398
}
9499

95100
impl MetricsSink {
96-
fn new() -> Self {
97-
Self {
101+
fn new(otlp_config: Option<OtlpConfig>) -> anyhow::Result<Self> {
102+
// Initialise the OTLP exporter if a config is provided
103+
let otlp_exporter = if let Some(config) = otlp_config {
104+
// Basic validation of the endpoint URL
105+
if !config.endpoint.starts_with("http://") && !config.endpoint.starts_with("https://") {
106+
return Err(anyhow::anyhow!(
107+
"Invalid OTLP endpoint configuration: endpoint must start with http:// or https://"
108+
));
109+
}
110+
111+
match OtlpMetricsExporter::new(&config) {
112+
Ok(exporter) => {
113+
tracing::info!("OpenTelemetry metrics export enabled to {}", config.endpoint);
114+
Some(exporter)
115+
}
116+
Err(e) => {
117+
tracing::error!("Failed to initialise OTLP exporter: {}", e);
118+
119+
// If the user explicitly requested metrics export but it failed,
120+
// we should return an error rather than silently continuing without metrics
121+
return Err(anyhow::anyhow!(
122+
"Failed to initialize OTLP metrics exporter: {}. If metrics export is not required, omit the OTLP configuration.",
123+
e
124+
));
125+
}
126+
}
127+
} else {
128+
// No OTLP config provided, running without OpenTelemetry metrics export
129+
tracing::debug!("Running without OpenTelemetry metrics export");
130+
None
131+
};
132+
133+
Ok(Self {
98134
metrics: DashMap::with_capacity(64),
99-
}
135+
otlp_exporter,
136+
})
100137
}
101138

102139
fn counter(&self, key: &Key) -> metrics::Counter {
@@ -115,12 +152,43 @@ impl MetricsSink {
115152
}
116153

117154
/// Publish all this sink's metrics to `tracing` log messages
155+
/// Send metrics to OTLP if enabled
118156
fn publish(&self) {
119157
// Collect the output lines so we can sort them to make reading easier
120158
let mut metrics = vec![];
121159

122160
for mut entry in self.metrics.iter_mut() {
123161
let (key, metric) = entry.pair_mut();
162+
163+
// If OTLP export is enabled, also send metrics to OpenTelemetry
164+
if let Some(exporter) = &self.otlp_exporter {
165+
// Convert labels to OpenTelemetry KeyValue pairs
166+
let attributes: Vec<KeyValue> = key
167+
.labels()
168+
.map(|label| KeyValue::new(label.key().to_string(), label.value().to_string()))
169+
.collect();
170+
171+
// Record the metric based on its type
172+
match metric {
173+
Metric::Counter(counter) => {
174+
if let Some((value, _)) = counter.load_and_reset() {
175+
exporter.record_counter(key, value, &attributes);
176+
}
177+
}
178+
Metric::Gauge(gauge) => {
179+
if let Some(value) = gauge.load_if_changed() {
180+
exporter.record_gauge(key, value, &attributes);
181+
}
182+
}
183+
Metric::Histogram(histogram) => {
184+
histogram.run_and_reset(|h| {
185+
let value = h.mean();
186+
exporter.record_histogram(key, value, &attributes);
187+
});
188+
}
189+
}
190+
}
191+
124192
let Some(metric) = metric.fmt_and_reset() else {
125193
continue;
126194
};
@@ -219,7 +287,7 @@ mod tests {
219287

220288
#[test]
221289
fn basic_metrics() {
222-
let sink = Arc::new(MetricsSink::new());
290+
let sink = Arc::new(MetricsSink::new(None).unwrap());
223291
let recorder = MetricsRecorder { sink: sink.clone() };
224292
with_local_recorder(&recorder, || {
225293
// Run twice to check reset works
@@ -310,4 +378,135 @@ mod tests {
310378
}
311379
});
312380
}
381+
382+
/// This is a manual test for verifying the integration of the metrics system with OpenTelemetry.
383+
/// It provides end-to-end verification of the metrics pipeline without needing to run the full mountpoint application.
384+
///
385+
/// # Requirements
386+
/// - An OpenTelemetry collector running at the specified endpoint (default: http://localhost:4318/v1/metrics)
387+
///
388+
/// # How to run
389+
/// ```bash
390+
/// # Start the OpenTelemetry collector (e.g., using Docker)
391+
/// docker run -p 4317:4317 -p 4318:4318 -v $(pwd)/collector-config.yaml:/etc/otel-collector-config.yaml \
392+
/// otel/opentelemetry-collector:latest --config=/etc/otel-collector-config.yaml
393+
///
394+
/// # Run the test with default endpoint (ignored by default)
395+
/// cargo test --package mountpoint-s3-fs --lib -- metrics::tests::otlp_metrics --exact --ignored
396+
///
397+
/// # Or run with a custom endpoint by setting the MOUNTPOINT_TEST_OTLP_ENDPOINT environment variable
398+
/// MOUNTPOINT_TEST_OTLP_ENDPOINT="http://custom-server:4318/v1/metrics" cargo test --package mountpoint-s3-fs --lib -- metrics::tests::otlp_metrics --exact --ignored
399+
///
400+
/// # Verify metrics in collector logs
401+
/// ```
402+
#[test]
403+
#[ignore]
404+
fn otlp_metrics() {
405+
use tracing::info;
406+
use tracing_subscriber::fmt::format::FmtSpan;
407+
use tracing_subscriber::util::SubscriberInitExt;
408+
409+
// Initialize tracing for better test output
410+
tracing_subscriber::fmt()
411+
.with_span_events(FmtSpan::CLOSE)
412+
.with_target(false)
413+
.with_thread_ids(true)
414+
.with_level(true)
415+
.with_file(true)
416+
.with_line_number(true)
417+
.with_test_writer()
418+
.set_default();
419+
420+
info!("Starting OTLP metrics test...");
421+
422+
// Get OTLP endpoint from environment variable or use default
423+
let endpoint = std::env::var("MOUNTPOINT_TEST_OTLP_ENDPOINT")
424+
.unwrap_or_else(|_| "http://localhost:4318/v1/metrics".to_string());
425+
426+
info!("Using OTLP endpoint: {}", endpoint);
427+
428+
// Initialize metrics with an OTLP config
429+
let config = OtlpConfig::new(&endpoint).with_interval_secs(1);
430+
let sink = Arc::new(MetricsSink::new(Some(config)).unwrap());
431+
let recorder = MetricsRecorder { sink: sink.clone() };
432+
433+
with_local_recorder(&recorder, || {
434+
// Test counter with multiple labels
435+
let counter = metrics::counter!(
436+
"mountpoint_test_counter",
437+
"operation" => "write",
438+
"status" => "success",
439+
"test" => "true"
440+
);
441+
counter.increment(100);
442+
counter.increment(50);
443+
info!("Recorded counter with total value 150");
444+
445+
// Test gauge with updates
446+
let gauge = metrics::gauge!(
447+
"mountpoint_test_gauge",
448+
"component" => "cache",
449+
"test" => "true"
450+
);
451+
gauge.set(1000.0);
452+
info!("Set gauge to 1000.0");
453+
gauge.set(500.0);
454+
info!("Updated gauge to 500.0");
455+
456+
// Test histogram with multiple records
457+
let histogram = metrics::histogram!(
458+
"mountpoint_test_histogram",
459+
"operation" => "read",
460+
"test" => "true"
461+
);
462+
histogram.record(10.0);
463+
histogram.record(20.0);
464+
histogram.record(30.0);
465+
info!("Recorded histogram values: 10.0, 20.0, 30.0");
466+
467+
// Publish metrics immediately to verify initial values
468+
info!("Publishing initial metrics...");
469+
sink.publish();
470+
471+
// Sleep to allow metrics to be exported
472+
std::thread::sleep(std::time::Duration::from_secs(2));
473+
474+
// Update metrics to verify changes are tracked
475+
counter.increment(200);
476+
gauge.set(750.0);
477+
histogram.record(40.0);
478+
info!("Updated all metrics with new values");
479+
480+
// Publish again to verify updates
481+
info!("Publishing updated metrics...");
482+
sink.publish();
483+
484+
// Wait for final export
485+
std::thread::sleep(std::time::Duration::from_secs(5));
486+
info!("Test complete. Metrics should show in collector logs.");
487+
});
488+
}
489+
490+
#[test]
491+
fn test_otlp_endpoint_validation() {
492+
// Test with an invalid URI - we need to directly test the MetricsSink::new function
493+
// since install() will try to set up a global recorder which can only be done once
494+
let config = OtlpConfig::new("not-a-valid-uri");
495+
let result = MetricsSink::new(Some(config));
496+
assert!(result.is_err());
497+
let error = result.unwrap_err().to_string();
498+
assert!(
499+
error.contains("Invalid OTLP endpoint configuration"),
500+
"Error message should indicate invalid configuration: {error}"
501+
);
502+
503+
// Test with no OTLP config (should succeed)
504+
let result = MetricsSink::new(None);
505+
assert!(result.is_ok());
506+
507+
// Test with a syntactically valid endpoint (should succeed)
508+
let config = OtlpConfig::new("http://example.com:4318/v1/metrics");
509+
let result = MetricsSink::new(Some(config));
510+
assert!(result.is_ok());
511+
}
313512
}

0 commit comments

Comments
 (0)