Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions core/src/main/java/kafka/automq/AutoMQConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
Expand Down Expand Up @@ -405,13 +404,16 @@ private static String genWALConfig(KafkaConfig config) {
private static String genMetricsExporterURI(KafkaConfig config) {
Password pwd = config.getPassword(S3_TELEMETRY_METRICS_EXPORTER_URI_CONFIG);
String uri = pwd == null ? null : pwd.value();
if (StringUtils.isNotBlank(uri)) {
return uri;
if (uri == null) {
uri = buildMetrixExporterURIWithOldConfigs(config);
}
return buildMetricsExporterUriFromLegacy(config);
if (!uri.contains(TELEMETRY_EXPORTER_TYPE_OPS)) {
uri += "," + buildOpsExporterURI();
}
return uri;
}

private static String buildMetricsExporterUriFromLegacy(KafkaConfig kafkaConfig) {
private static String buildMetrixExporterURIWithOldConfigs(KafkaConfig kafkaConfig) {
if (!kafkaConfig.getBoolean(S3_METRICS_ENABLE_CONFIG)) {
return "";
}
Expand All @@ -428,9 +430,6 @@ private static String buildMetricsExporterUriFromLegacy(KafkaConfig kafkaConfig)
case TELEMETRY_EXPORTER_TYPE_PROMETHEUS:
exportedUris.add(buildPrometheusExporterURI(kafkaConfig));
break;
case TELEMETRY_EXPORTER_TYPE_OPS:
exportedUris.add(buildS3ExporterURI());
break;
default:
LOGGER.error("illegal metrics exporter type: {}", exporterType);
break;
Expand All @@ -439,13 +438,10 @@ private static String buildMetricsExporterUriFromLegacy(KafkaConfig kafkaConfig)
}

if (kafkaConfig.getBoolean(S3_TELEMETRY_OPS_ENABLED_CONFIG)) {
exportedUris.add(buildS3ExporterURI());
exportedUris.add(buildOpsExporterURI());
}

return exportedUris.stream()
.filter(StringUtils::isNotBlank)
.distinct()
.collect(Collectors.joining(","));
return String.join(",", exportedUris);
}

private static String buildOTLPExporterURI(KafkaConfig kafkaConfig) {
Expand All @@ -472,7 +468,7 @@ private static String buildPrometheusExporterURI(KafkaConfig kafkaConfig) {
"port" + "=" + kafkaConfig.getInt(S3_METRICS_EXPORTER_PROM_PORT_CONFIG);
}

private static String buildS3ExporterURI() {
private static String buildOpsExporterURI() {
return TELEMETRY_EXPORTER_TYPE_OPS + URI_DELIMITER;
}

Expand Down
Loading