Skip to content

Commit ddbb286

Browse files
authored
fix(core): auto add ops metrcis (#3013) (#3014)
1 parent 91459b2 commit ddbb286

File tree

1 file changed

+10
-14
lines changed

1 file changed

+10
-14
lines changed

core/src/main/java/kafka/automq/AutoMQConfig.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import java.util.List;
3939
import java.util.Optional;
4040
import java.util.concurrent.TimeUnit;
41-
import java.util.stream.Collectors;
4241

4342
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
4443
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
@@ -405,13 +404,16 @@ private static String genWALConfig(KafkaConfig config) {
405404
private static String genMetricsExporterURI(KafkaConfig config) {
406405
Password pwd = config.getPassword(S3_TELEMETRY_METRICS_EXPORTER_URI_CONFIG);
407406
String uri = pwd == null ? null : pwd.value();
408-
if (StringUtils.isNotBlank(uri)) {
409-
return uri;
407+
if (uri == null) {
408+
uri = buildMetrixExporterURIWithOldConfigs(config);
410409
}
411-
return buildMetricsExporterUriFromLegacy(config);
410+
if (!uri.contains(TELEMETRY_EXPORTER_TYPE_OPS)) {
411+
uri += "," + buildOpsExporterURI();
412+
}
413+
return uri;
412414
}
413415

414-
private static String buildMetricsExporterUriFromLegacy(KafkaConfig kafkaConfig) {
416+
private static String buildMetrixExporterURIWithOldConfigs(KafkaConfig kafkaConfig) {
415417
if (!kafkaConfig.getBoolean(S3_METRICS_ENABLE_CONFIG)) {
416418
return "";
417419
}
@@ -428,9 +430,6 @@ private static String buildMetricsExporterUriFromLegacy(KafkaConfig kafkaConfig)
428430
case TELEMETRY_EXPORTER_TYPE_PROMETHEUS:
429431
exportedUris.add(buildPrometheusExporterURI(kafkaConfig));
430432
break;
431-
case TELEMETRY_EXPORTER_TYPE_OPS:
432-
exportedUris.add(buildS3ExporterURI());
433-
break;
434433
default:
435434
LOGGER.error("illegal metrics exporter type: {}", exporterType);
436435
break;
@@ -439,13 +438,10 @@ private static String buildMetricsExporterUriFromLegacy(KafkaConfig kafkaConfig)
439438
}
440439

441440
if (kafkaConfig.getBoolean(S3_TELEMETRY_OPS_ENABLED_CONFIG)) {
442-
exportedUris.add(buildS3ExporterURI());
441+
exportedUris.add(buildOpsExporterURI());
443442
}
444443

445-
return exportedUris.stream()
446-
.filter(StringUtils::isNotBlank)
447-
.distinct()
448-
.collect(Collectors.joining(","));
444+
return String.join(",", exportedUris);
449445
}
450446

451447
private static String buildOTLPExporterURI(KafkaConfig kafkaConfig) {
@@ -472,7 +468,7 @@ private static String buildPrometheusExporterURI(KafkaConfig kafkaConfig) {
472468
"port" + "=" + kafkaConfig.getInt(S3_METRICS_EXPORTER_PROM_PORT_CONFIG);
473469
}
474470

475-
private static String buildS3ExporterURI() {
471+
private static String buildOpsExporterURI() {
476472
return TELEMETRY_EXPORTER_TYPE_OPS + URI_DELIMITER;
477473
}
478474

0 commit comments

Comments
 (0)