Skip to content

Commit 2e542e9

Browse files
Handle non-numeric metric values (#21)
Signed-off-by: Owen <[email protected]>
1 parent 3c6d761 commit 2e542e9

File tree

6 files changed

+175
-85
lines changed

6 files changed

+175
-85
lines changed

src/main/java/io/strimzi/kafka/metrics/KafkaMetricsCollector.java

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,10 @@
1212
import org.slf4j.LoggerFactory;
1313

1414
import java.util.ArrayList;
15-
import java.util.LinkedHashMap;
1615
import java.util.List;
1716
import java.util.Locale;
1817
import java.util.Map;
1918
import java.util.concurrent.ConcurrentHashMap;
20-
import java.util.stream.Collectors;
2119

2220
/**
2321
* Prometheus Collector to store and export metrics retrieved by the reporters.
@@ -59,15 +57,14 @@ public List<MetricFamilySamples> collect() {
5957
KafkaMetric kafkaMetric = entry.getValue();
6058
LOG.trace("Collecting Kafka metric {}", metricName);
6159

62-
String name = metricName(metricName);
60+
String prometheusMetricName = metricName(metricName);
6361
// TODO Filtering should take labels into account
64-
if (!config.isAllowed(name)) {
65-
LOG.info("Kafka metric {} is not allowed", name);
62+
if (!config.isAllowed(prometheusMetricName)) {
63+
LOG.info("Kafka metric {} is not allowed", prometheusMetricName);
6664
continue;
6765
}
68-
LOG.info("Kafka metric {} is allowed", name);
69-
LOG.info("labels " + metricName.tags());
70-
MetricFamilySamples sample = convert(name, metricName.description(), kafkaMetric, metricName.tags());
66+
LOG.info("Kafka metric {} is allowed", prometheusMetricName);
67+
MetricFamilySamples sample = convert(prometheusMetricName, kafkaMetric, metricName);
7168
if (sample != null) {
7269
samples.add(sample);
7370
}
@@ -109,24 +106,20 @@ String metricName(MetricName metricName) {
109106
return prefix + '_' + group + '_' + name;
110107
}
111108

112-
static MetricFamilySamples convert(String name, String help, KafkaMetric metric, Map<String, String> labels) {
113-
Object value = metric.metricValue();
114-
if (!(value instanceof Number)) {
115-
// Prometheus only accepts numeric metrics.
116-
// Kafka gauges can have arbitrary types, so skip them for now
117-
// TODO move non-numeric values to labels
118-
return null;
109+
private static MetricFamilySamples convert(String prometheusMetricName, KafkaMetric metric, MetricName metricName) {
110+
Map<String, String> sanitizedLabels = MetricFamilySamplesBuilder.sanitizeLabels(metricName.tags());
111+
Object valueObj = metric.metricValue();
112+
double value;
113+
if (valueObj instanceof Number) {
114+
value = ((Number) valueObj).doubleValue();
115+
} else {
116+
value = 1.0;
117+
String attributeName = metricName.name();
118+
sanitizedLabels.put(Collector.sanitizeMetricName(attributeName), String.valueOf(valueObj));
119119
}
120-
Map<String, String> sanitizedLabels = labels.entrySet().stream()
121-
.collect(Collectors.toMap(
122-
e -> Collector.sanitizeMetricName(e.getKey()),
123-
Map.Entry::getValue,
124-
(v1, v2) -> {
125-
throw new IllegalStateException("Unexpected duplicate key " + v1);
126-
},
127-
LinkedHashMap::new));
128-
return new MetricFamilySamplesBuilder(Type.GAUGE, help)
129-
.addSample(name, ((Number) value).doubleValue(), sanitizedLabels)
130-
.build();
120+
121+
return new MetricFamilySamplesBuilder(Type.GAUGE, metric.metricName().description())
122+
.addSample(prometheusMetricName, value, sanitizedLabels)
123+
.build();
131124
}
132125
}

src/main/java/io/strimzi/kafka/metrics/MetricFamilySamplesBuilder.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,23 @@
66

77
import com.yammer.metrics.stats.Snapshot;
88
import io.prometheus.client.Collector;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
911

1012
import java.util.ArrayList;
1113
import java.util.Arrays;
1214
import java.util.HashMap;
15+
import java.util.LinkedHashMap;
1316
import java.util.List;
1417
import java.util.Map;
18+
import java.util.stream.Collectors;
1519

1620
/**
1721
* Helper class to convert Kafka metrics into the Prometheus format.
1822
*/
1923
public class MetricFamilySamplesBuilder {
2024

25+
private static final Logger LOG = LoggerFactory.getLogger(MetricFamilySamplesBuilder.class.getName());
2126
private final Collector.Type type;
2227
private final String help;
2328
private final List<Collector.MetricFamilySamples.Sample> samples;
@@ -58,4 +63,25 @@ Collector.MetricFamilySamples build() {
5863
}
5964
return new Collector.MetricFamilySamples(samples.get(0).name, type, help, samples);
6065
}
66+
67+
/**
68+
* Sanitizes the given map of labels by replacing any characters in the label keys
69+
* that are not allowed in Prometheus metric names with an underscore ('_').
70+
* If there are duplicate keys after sanitization, a warning is logged, and the first value is retained.
71+
*
72+
* @param labels The map of labels to be sanitized. The keys of this map are label names,
73+
* and the values are label values.
74+
* @return A new map with sanitized label names and the same label values.
75+
*/
76+
public static Map<String, String> sanitizeLabels(Map<String, String> labels) {
77+
return labels.entrySet().stream()
78+
.collect(Collectors.toMap(
79+
e -> Collector.sanitizeMetricName(e.getKey()),
80+
Map.Entry::getValue,
81+
(v1, v2) -> {
82+
LOG.warn("Ignoring metric value duplicate key {}", v1);
83+
return v1;
84+
},
85+
LinkedHashMap::new));
86+
}
6187
}

src/main/java/io/strimzi/kafka/metrics/YammerMetricsCollector.java

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -58,27 +58,27 @@ public List<MetricFamilySamples> collect() {
5858
Metric metric = entry.getValue();
5959
LOG.trace("Collecting Yammer metric {}", metricName);
6060

61-
String name = metricName(metricName);
61+
String prometheusMetricName = metricName(metricName);
6262
// TODO Filtering should take labels into account
63-
if (!config.isAllowed(name)) {
64-
LOG.info("Yammer metric {} is not allowed", name);
63+
if (!config.isAllowed(prometheusMetricName)) {
64+
LOG.info("Yammer metric {} is not allowed", prometheusMetricName);
6565
continue;
6666
}
67-
LOG.info("Yammer metric {} is allowed", name);
67+
LOG.info("Yammer metric {} is allowed", prometheusMetricName);
6868
Map<String, String> labels = labelsFromScope(metricName.getScope());
69-
LOG.info("labels " + labels);
69+
LOG.info("labels {} ", labels);
7070

7171
MetricFamilySamples sample = null;
7272
if (metric instanceof Counter) {
73-
sample = convert(name, (Counter) metric, labels);
73+
sample = convert(prometheusMetricName, (Counter) metric, labels);
7474
} else if (metric instanceof Gauge) {
75-
sample = convert(name, (Gauge<?>) metric, labels);
75+
sample = convert(prometheusMetricName, (Gauge<?>) metric, labels, metricName);
7676
} else if (metric instanceof Histogram) {
77-
sample = convert(name, (Histogram) metric, labels);
77+
sample = convert(prometheusMetricName, (Histogram) metric, labels);
7878
} else if (metric instanceof Meter) {
79-
sample = convert(name, (Meter) metric, labels);
79+
sample = convert(prometheusMetricName, (Meter) metric, labels);
8080
} else if (metric instanceof Timer) {
81-
sample = convert(name, (Timer) metric, labels);
81+
sample = convert(prometheusMetricName, (Timer) metric, labels);
8282
} else {
8383
LOG.error("The metric " + metric.getClass().getName() + " has an unexpected type.");
8484
}
@@ -114,41 +114,46 @@ static Map<String, String> labelsFromScope(String scope) {
114114
return Collections.emptyMap();
115115
}
116116

117-
static MetricFamilySamples convert(String name, Counter counter, Map<String, String> labels) {
117+
static MetricFamilySamples convert(String prometheusMetricName, Counter counter, Map<String, String> labels) {
118118
return new MetricFamilySamplesBuilder(Type.GAUGE, "")
119-
.addSample(name + "_count", counter.count(), labels)
119+
.addSample(prometheusMetricName + "_count", counter.count(), labels)
120120
.build();
121121
}
122122

123-
static MetricFamilySamples convert(String name, Gauge<?> gauge, Map<String, String> labels) {
124-
Object value = gauge.value();
125-
if (!(value instanceof Number)) {
126-
// Prometheus only accepts numeric metrics.
127-
// Some Kafka gauges have string values (for example kafka.server:type=KafkaServer,name=ClusterId), so skip them
128-
return null;
123+
private static MetricFamilySamples convert(String prometheusMetricName, Gauge<?> gauge, Map<String, String> labels, MetricName metricName) {
124+
Map<String, String> sanitizedLabels = MetricFamilySamplesBuilder.sanitizeLabels(labels);
125+
Object valueObj = gauge.value();
126+
double value;
127+
if (valueObj instanceof Number) {
128+
value = ((Number) valueObj).doubleValue();
129+
} else {
130+
value = 1.0;
131+
String attributeName = metricName.getName();
132+
sanitizedLabels.put(Collector.sanitizeMetricName(attributeName), String.valueOf(valueObj));
129133
}
134+
130135
return new MetricFamilySamplesBuilder(Type.GAUGE, "")
131-
.addSample(name, ((Number) value).doubleValue(), labels)
136+
.addSample(prometheusMetricName, value, sanitizedLabels)
132137
.build();
133138
}
134139

135-
static MetricFamilySamples convert(String name, Meter meter, Map<String, String> labels) {
140+
static MetricFamilySamples convert(String prometheusMetricName, Meter meter, Map<String, String> labels) {
136141
return new MetricFamilySamplesBuilder(Type.COUNTER, "")
137-
.addSample(name + "_count", meter.count(), labels)
142+
.addSample(prometheusMetricName + "_count", meter.count(), labels)
138143
.build();
139144
}
140145

141-
static MetricFamilySamples convert(String name, Histogram histogram, Map<String, String> labels) {
146+
static MetricFamilySamples convert(String prometheusMetricName, Histogram histogram, Map<String, String> labels) {
142147
return new MetricFamilySamplesBuilder(Type.SUMMARY, "")
143-
.addSample(name + "_count", histogram.count(), labels)
144-
.addQuantileSamples(name, histogram.getSnapshot(), labels)
148+
.addSample(prometheusMetricName + "_count", histogram.count(), labels)
149+
.addQuantileSamples(prometheusMetricName, histogram.getSnapshot(), labels)
145150
.build();
146151
}
147152

148-
static MetricFamilySamples convert(String name, Timer metric, Map<String, String> labels) {
153+
static MetricFamilySamples convert(String prometheusMetricName, Timer metric, Map<String, String> labels) {
149154
return new MetricFamilySamplesBuilder(Type.SUMMARY, "")
150-
.addSample(name + "_count", metric.count(), labels)
151-
.addQuantileSamples(name, metric.getSnapshot(), labels)
155+
.addSample(prometheusMetricName + "_count", metric.count(), labels)
156+
.addQuantileSamples(prometheusMetricName, metric.getSnapshot(), labels)
152157
.build();
153158
}
154159
}

src/test/java/io/strimzi/kafka/metrics/KafkaMetricsCollectorTest.java

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616

1717
import java.util.ArrayList;
1818
import java.util.HashMap;
19+
import java.util.LinkedHashMap;
1920
import java.util.List;
2021
import java.util.Map;
2122

2223
import static org.junit.jupiter.api.Assertions.assertEquals;
2324
import static org.junit.jupiter.api.Assertions.assertTrue;
2425

25-
2626
public class KafkaMetricsCollectorTest {
2727

2828
private final MetricConfig metricConfig = new MetricConfig();
@@ -36,7 +36,7 @@ public void setup() {
3636
}
3737

3838
@Test
39-
public void testCollect() {
39+
public void testMetricLifecycle() {
4040
Map<String, String> props = new HashMap<>();
4141
props.put(PrometheusMetricsReporterConfig.ALLOWLIST_CONFIG, "kafka_server_group_name.*");
4242
PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(props);
@@ -51,35 +51,65 @@ public void testCollect() {
5151
metrics = collector.collect();
5252
assertTrue(metrics.isEmpty());
5353

54-
// Adding a non-numeric metric does nothing
55-
collector.addMetric(buildNonNumericMetric("name2", "group"));
56-
metrics = collector.collect();
57-
assertTrue(metrics.isEmpty());
58-
5954
// Adding a metric that matches the allowlist
6055
collector.addMetric(buildMetric("name", "group", 1.0));
6156
metrics = collector.collect();
6257
assertEquals(1, metrics.size());
63-
assertEquals("kafka_server_group_name", metrics.get(0).name);
64-
assertEquals(1, metrics.get(0).samples.size());
65-
assertEquals(1.0, metrics.get(0).samples.get(0).value, 0.1);
66-
assertEquals(new ArrayList<>(labels.keySet()), metrics.get(0).samples.get(0).labelNames);
67-
assertEquals(new ArrayList<>(labels.values()), metrics.get(0).samples.get(0).labelValues);
58+
59+
Collector.MetricFamilySamples metricFamilySamples = metrics.get(0);
60+
assertMetricFamilySample(metricFamilySamples, 1.0, labels);
6861

6962
// Adding the same metric updates its value
7063
collector.addMetric(buildMetric("name", "group", 3.0));
7164
metrics = collector.collect();
7265
assertEquals(1, metrics.size());
73-
assertEquals("kafka_server_group_name", metrics.get(0).name);
74-
assertEquals(1, metrics.get(0).samples.size());
75-
assertEquals(3.0, metrics.get(0).samples.get(0).value, 0.1);
66+
67+
Collector.MetricFamilySamples updatedMetrics = metrics.get(0);
68+
assertMetricFamilySample(updatedMetrics, 3.0, labels);
7669

7770
// Removing the metric
7871
collector.removeMetric(buildMetric("name", "group", 4.0));
7972
metrics = collector.collect();
8073
assertTrue(metrics.isEmpty());
8174
}
8275

76+
@Test
77+
public void testCollectNonNumericMetric() {
78+
Map<String, String> props = new HashMap<>();
79+
props.put(PrometheusMetricsReporterConfig.ALLOWLIST_CONFIG, "kafka_server_group_name.*");
80+
PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(props);
81+
KafkaMetricsCollector collector = new KafkaMetricsCollector(config);
82+
collector.setPrefix("kafka.server");
83+
84+
List<Collector.MetricFamilySamples> metrics = collector.collect();
85+
assertTrue(metrics.isEmpty());
86+
87+
// Adding a non-numeric metric converted
88+
String nonNumericValue = "myValue";
89+
KafkaMetric nonNumericMetric = buildNonNumericMetric("name", "group", nonNumericValue);
90+
collector.addMetric(nonNumericMetric);
91+
metrics = collector.collect();
92+
93+
Map<String, String> expectedLabels = new LinkedHashMap<>(labels);
94+
expectedLabels.put("name", nonNumericValue);
95+
assertEquals(1, metrics.size());
96+
97+
Collector.MetricFamilySamples metricFamilySamples = metrics.get(0);
98+
99+
assertEquals("kafka_server_group_name", metricFamilySamples.name);
100+
assertMetricFamilySample(metricFamilySamples, 1.0, expectedLabels);
101+
}
102+
103+
private void assertMetricFamilySample(Collector.MetricFamilySamples actual, double expectedValue, Map<String, String> expectedLabels) {
104+
105+
Collector.MetricFamilySamples.Sample actualSample = actual.samples.get(0);
106+
107+
assertEquals(1, actual.samples.size());
108+
assertEquals(actualSample.value, expectedValue, 0.1);
109+
assertEquals(new ArrayList<>(expectedLabels.keySet()), actualSample.labelNames);
110+
assertEquals(new ArrayList<>(expectedLabels.values()), actualSample.labelValues);
111+
}
112+
83113
private KafkaMetric buildMetric(String name, String group, double value) {
84114
Measurable measurable = (config, now) -> value;
85115
return new KafkaMetric(
@@ -90,8 +120,8 @@ private KafkaMetric buildMetric(String name, String group, double value) {
90120
time);
91121
}
92122

93-
private KafkaMetric buildNonNumericMetric(String name, String group) {
94-
Gauge<String> measurable = (config, now) -> "hello";
123+
private KafkaMetric buildNonNumericMetric(String name, String group, String value) {
124+
Gauge<String> measurable = (config, now) -> value;
95125
return new KafkaMetric(
96126
new Object(),
97127
new MetricName(name, group, "", labels),

src/test/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporterTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,11 @@ public void testLifeCycle() throws Exception {
6363
KafkaMetric metric3 = buildNonNumericMetric("name3", "group");
6464
reporter.metricChange(metric3);
6565
metrics = getMetrics(port);
66-
assertEquals(initialMetrics + 2, metrics.size());
66+
assertEquals(initialMetrics + 3, metrics.size());
6767

6868
reporter.metricRemoval(metric1);
6969
metrics = getMetrics(port);
70-
assertEquals(initialMetrics + 1, metrics.size());
70+
assertEquals(initialMetrics + 2, metrics.size());
7171

7272
reporter.close();
7373
}

0 commit comments

Comments
 (0)