diff --git a/core/src/main/java/kafka/automq/AutoMQConfig.java b/core/src/main/java/kafka/automq/AutoMQConfig.java index 5694942323..cd4c04e75f 100644 --- a/core/src/main/java/kafka/automq/AutoMQConfig.java +++ b/core/src/main/java/kafka/automq/AutoMQConfig.java @@ -38,7 +38,6 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; @@ -405,13 +404,16 @@ private static String genWALConfig(KafkaConfig config) { private static String genMetricsExporterURI(KafkaConfig config) { Password pwd = config.getPassword(S3_TELEMETRY_METRICS_EXPORTER_URI_CONFIG); String uri = pwd == null ? null : pwd.value(); - if (StringUtils.isNotBlank(uri)) { - return uri; + if (uri == null) { + uri = buildMetrixExporterURIWithOldConfigs(config); } - return buildMetricsExporterUriFromLegacy(config); + if (!uri.contains(TELEMETRY_EXPORTER_TYPE_OPS)) { + uri += "," + buildOpsExporterURI(); + } + return uri; } - private static String buildMetricsExporterUriFromLegacy(KafkaConfig kafkaConfig) { + private static String buildMetrixExporterURIWithOldConfigs(KafkaConfig kafkaConfig) { if (!kafkaConfig.getBoolean(S3_METRICS_ENABLE_CONFIG)) { return ""; } @@ -428,9 +430,6 @@ private static String buildMetricsExporterUriFromLegacy(KafkaConfig kafkaConfig) case TELEMETRY_EXPORTER_TYPE_PROMETHEUS: exportedUris.add(buildPrometheusExporterURI(kafkaConfig)); break; - case TELEMETRY_EXPORTER_TYPE_OPS: - exportedUris.add(buildS3ExporterURI()); - break; default: LOGGER.error("illegal metrics exporter type: {}", exporterType); break; @@ -439,13 +438,10 @@ private static String buildMetricsExporterUriFromLegacy(KafkaConfig kafkaConfig) } if (kafkaConfig.getBoolean(S3_TELEMETRY_OPS_ENABLED_CONFIG)) { - exportedUris.add(buildS3ExporterURI()); + exportedUris.add(buildOpsExporterURI()); } - return exportedUris.stream() - .filter(StringUtils::isNotBlank) - .distinct() - .collect(Collectors.joining(",")); + return String.join(",", exportedUris); } private static String buildOTLPExporterURI(KafkaConfig kafkaConfig) { @@ -472,7 +468,7 @@ private static String buildPrometheusExporterURI(KafkaConfig kafkaConfig) { "port" + "=" + kafkaConfig.getInt(S3_METRICS_EXPORTER_PROM_PORT_CONFIG); } - private static String buildS3ExporterURI() { + private static String buildOpsExporterURI() { return TELEMETRY_EXPORTER_TYPE_OPS + URI_DELIMITER; }