Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions docs/examples/metrics/producer/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# MetricProducer Examples

This directory contains examples of how to implement and use the `MetricProducer` interface to bridge third-party metric sources with OpenTelemetry.

## What is MetricProducer?

`MetricProducer` is an interface defined in the OpenTelemetry specification that allows you to plug third-party metric sources into an OpenTelemetry `MetricReader`. This is particularly useful for:

- Bridging existing monitoring systems to OpenTelemetry
- Integrating with systems like Prometheus, StatsD, or custom monitoring solutions
- Collecting pre-processed metrics from external sources

## Key Concepts

- **MetricProducer**: Interface that defines how to produce metrics from third-party sources
- **MetricReader**: Collects metrics from both the OpenTelemetry SDK and registered MetricProducers
- **Pre-processed data**: Unlike OpenTelemetry instruments that collect raw measurements, MetricProducers work with already aggregated metrics

## Examples

### basic_example.py

A comprehensive example showing:
- How to implement `MetricProducer` for different systems (Prometheus simulation, custom system)
- How to convert third-party metric formats to OpenTelemetry `MetricsData`
- How to register producers with a `MetricReader`
- How both SDK metrics and producer metrics are combined

## Running the Examples

```bash
# From the repo root
cd docs/examples/metrics/producer
python basic_example.py
```

## Implementation Pattern

When implementing a `MetricProducer`:

1. **Inherit from MetricProducer**: Create a class that extends the abstract base class
2. **Implement produce()**: This method should fetch and convert metrics to OpenTelemetry format
3. **Handle errors gracefully**: Your producer should not crash the entire collection process
4. **Respect timeout**: The `produce()` method receives a timeout parameter
5. **Return MetricsData**: Convert your metrics to the standard OpenTelemetry format

```python
from opentelemetry.sdk.metrics.export import MetricProducer, MetricsData

class MyMetricProducer(MetricProducer):
def produce(self, timeout_millis: float = 10_000) -> MetricsData:
# Fetch metrics from your source
raw_metrics = self.fetch_from_source()

# Convert to OpenTelemetry format
otel_metrics = self.convert_to_otel_format(raw_metrics)

# Return as MetricsData
return MetricsData(resource_metrics=otel_metrics)
```

## Best Practices

1. **Resource Identification**: Use appropriate resource attributes to identify the source system
2. **Instrumentation Scope**: Create meaningful instrumentation scopes for your producers
3. **Metric Naming**: Use clear, descriptive metric names, optionally with prefixes
4. **Error Handling**: Handle network errors, parsing errors, and timeouts gracefully
5. **Performance**: Consider caching and efficient data fetching to avoid impacting collection performance
6. **Thread Safety**: Ensure your producer is thread-safe as it may be called concurrently

## Integration with MetricReader

```python
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader, ConsoleMetricExporter

# Create your producers
producer1 = MyCustomProducer()
producer2 = PrometheusProducer()

# Create a reader with producers
reader = PeriodicExportingMetricReader(
exporter=ConsoleMetricExporter(),
metric_producers=[producer1, producer2]
)

# The reader will automatically collect from both SDK and producers
```

## Relationship to OpenTelemetry Instruments

MetricProducer is different from OpenTelemetry instruments:

- **Instruments** (Counter, Histogram, etc.): Collect raw measurements and aggregate them in the SDK
- **MetricProducer**: Provides already-aggregated metrics from external sources

Use MetricProducer when you have an existing system that already aggregates metrics and you want to bridge that data into OpenTelemetry.

## Further Reading

- [OpenTelemetry Metrics Specification](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricproducer)
- [OpenTelemetry Python SDK Documentation](https://opentelemetry-python.readthedocs.io/)
268 changes: 268 additions & 0 deletions docs/examples/metrics/producer/basic_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This example demonstrates how to implement and use a MetricProducer to
bridge third-party metric sources with OpenTelemetry.

MetricProducer allows you to integrate pre-processed metrics from external
systems (like Prometheus, custom monitoring systems, etc.) into the
OpenTelemetry metrics pipeline.
"""

import time
from typing import Dict

from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import (
ConsoleMetricExporter,
Metric,
MetricProducer,
MetricsData,
NumberDataPoint,
PeriodicExportingMetricReader,
ResourceMetrics,
ScopeMetrics,
Sum,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationScope


class PrometheusMetricProducer(MetricProducer):
"""Example MetricProducer that bridges Prometheus metrics.

This example shows how to fetch metrics from a third-party source
(simulating Prometheus) and convert them to OpenTelemetry format.
"""

def __init__(self, prometheus_url: str = "http://localhost:9090"):
self.prometheus_url = prometheus_url
self.instrumentation_scope = InstrumentationScope(
name="prometheus.bridge",
version="1.0.0"
)
self.resource = Resource.create({
"service.name": "prometheus-bridge",
"bridge.source": "prometheus",
"bridge.url": prometheus_url
})

def produce(self, timeout_millis: float = 10_000) -> MetricsData:
"""Fetch metrics from Prometheus and convert to OpenTelemetry format."""

# In a real implementation, you would:
# 1. Make HTTP request to Prometheus /api/v1/query_range or /metrics
# 2. Parse the response (JSON or Prometheus text format)
# 3. Convert to OpenTelemetry metrics

# For this example, we'll simulate fetching metrics
simulated_prometheus_metrics = self._fetch_prometheus_metrics()

# Convert to OpenTelemetry format
otel_metrics = []
for metric_name, metric_data in simulated_prometheus_metrics.items():
otel_metrics.append(
Metric(
name=f"prometheus.{metric_name}",
description=f"Metric {metric_name} from Prometheus",
unit=metric_data.get("unit", "1"),
data=Sum(
data_points=[
NumberDataPoint(
attributes=metric_data.get("labels", {}),
start_time_unix_nano=int((time.time() - 60) * 1e9), # 1 minute ago
time_unix_nano=int(time.time() * 1e9),
value=metric_data["value"],
)
],
aggregation_temporality=1, # CUMULATIVE
is_monotonic=metric_data.get("monotonic", False),
),
)
)

# Return as MetricsData
return MetricsData(
resource_metrics=[
ResourceMetrics(
resource=self.resource,
scope_metrics=[
ScopeMetrics(
scope=self.instrumentation_scope,
metrics=otel_metrics,
schema_url="",
)
],
schema_url="",
)
]
)

def _fetch_prometheus_metrics(self) -> Dict[str, Dict]:
"""Simulate fetching metrics from Prometheus."""
# In a real implementation, this would make HTTP requests to Prometheus
# and parse the response. For this example, we return simulated data.

return {
"http_requests_total": {
"value": 12345,
"labels": {"method": "GET", "status": "200"},
"unit": "1",
"monotonic": True,
},
"http_request_duration_seconds": {
"value": 0.234,
"labels": {"method": "GET", "quantile": "0.95"},
"unit": "s",
"monotonic": False,
},
"memory_usage_bytes": {
"value": 1024 * 1024 * 512, # 512 MB
"labels": {"instance": "server-1"},
"unit": "bytes",
"monotonic": False,
},
}


class CustomSystemMetricProducer(MetricProducer):
"""Example MetricProducer for a custom monitoring system."""

def __init__(self, system_name: str = "custom-system"):
self.system_name = system_name
self.instrumentation_scope = InstrumentationScope(
name=f"{system_name}.bridge",
version="1.0.0"
)
self.resource = Resource.create({
"service.name": f"{system_name}-bridge",
"bridge.source": system_name,
})

def produce(self, timeout_millis: float = 10_000) -> MetricsData:
"""Fetch metrics from custom system."""

# Simulate fetching from a custom system
custom_metrics = self._fetch_custom_metrics()

# Convert to OpenTelemetry format
otel_metrics = []
for metric in custom_metrics:
otel_metrics.append(
Metric(
name=f"custom.{metric['name']}",
description=metric.get("description", ""),
unit=metric.get("unit", "1"),
data=Sum(
data_points=[
NumberDataPoint(
attributes=metric.get("tags", {}),
start_time_unix_nano=int((time.time() - 30) * 1e9), # 30 seconds ago
time_unix_nano=int(time.time() * 1e9),
value=metric["value"],
)
],
aggregation_temporality=1, # CUMULATIVE
is_monotonic=metric.get("is_counter", False),
),
)
)

return MetricsData(
resource_metrics=[
ResourceMetrics(
resource=self.resource,
scope_metrics=[
ScopeMetrics(
scope=self.instrumentation_scope,
metrics=otel_metrics,
schema_url="",
)
],
schema_url="",
)
]
)

def _fetch_custom_metrics(self) -> list:
"""Simulate fetching from a custom monitoring system."""
return [
{
"name": "database_connections",
"value": 25,
"description": "Active database connections",
"unit": "1",
"tags": {"database": "postgres", "pool": "main"},
"is_counter": False,
},
{
"name": "api_calls_total",
"value": 9876,
"description": "Total API calls processed",
"unit": "1",
"tags": {"endpoint": "/api/v1/users", "method": "GET"},
"is_counter": True,
},
]


def main():
"""Example usage of MetricProducer with OpenTelemetry."""

print("Starting MetricProducer example...")

# Create MetricProducers for different third-party sources
prometheus_producer = PrometheusMetricProducer("http://localhost:9090")
custom_producer = CustomSystemMetricProducer("monitoring-system")

# Create a metric reader that includes the producers
exporter = ConsoleMetricExporter()
reader = PeriodicExportingMetricReader(
exporter=exporter,
export_interval_millis=5000, # Export every 5 seconds
metric_producers=[prometheus_producer, custom_producer]
)

# IMPORTANT: Register the reader with a MeterProvider
# This is required for the reader to be able to collect metrics
meter_provider = MeterProvider(metric_readers=[reader])

print("Configured MetricReader with the following producers:")
print("- PrometheusMetricProducer (simulated)")
print("- CustomSystemMetricProducer (simulated)")
print("\nThe reader is now registered with a MeterProvider and will collect")
print("metrics from these producers every 5 seconds and export them to the console.\n")

# Note: You can also use the meter_provider to create meters and instruments
# meter = meter_provider.get_meter("example.meter")
# counter = meter.create_counter("example.counter")
# counter.add(1)

print("=== Metrics will be collected and exported every 5 seconds ===")
print("Press Ctrl+C to stop...")

try:
# Let it run for a bit to show periodic collection
time.sleep(20)
except KeyboardInterrupt:
print("\nStopping...")
finally:
# Clean shutdown
meter_provider.shutdown()
print("MeterProvider shut down successfully.")


if __name__ == "__main__":
main()
Loading