3838import java .util .List ;
3939import java .util .Optional ;
4040import java .util .concurrent .TimeUnit ;
41- import java .util .stream .Collectors ;
4241
4342import static org .apache .kafka .common .config .ConfigDef .Importance .HIGH ;
4443import static org .apache .kafka .common .config .ConfigDef .Importance .LOW ;
@@ -405,13 +404,16 @@ private static String genWALConfig(KafkaConfig config) {
405404 private static String genMetricsExporterURI (KafkaConfig config ) {
406405 Password pwd = config .getPassword (S3_TELEMETRY_METRICS_EXPORTER_URI_CONFIG );
407406 String uri = pwd == null ? null : pwd .value ();
408- if (StringUtils . isNotBlank ( uri ) ) {
409- return uri ;
407+ if (uri == null ) {
408+ uri = buildMetrixExporterURIWithOldConfigs ( config ) ;
410409 }
411- return buildMetricsExporterUriFromLegacy (config );
410+ if (!uri .contains (TELEMETRY_EXPORTER_TYPE_OPS )) {
411+ uri += "," + buildOpsExporterURI ();
412+ }
413+ return uri ;
412414 }
413415
414- private static String buildMetricsExporterUriFromLegacy (KafkaConfig kafkaConfig ) {
416+ private static String buildMetrixExporterURIWithOldConfigs (KafkaConfig kafkaConfig ) {
415417 if (!kafkaConfig .getBoolean (S3_METRICS_ENABLE_CONFIG )) {
416418 return "" ;
417419 }
@@ -428,9 +430,6 @@ private static String buildMetricsExporterUriFromLegacy(KafkaConfig kafkaConfig)
428430 case TELEMETRY_EXPORTER_TYPE_PROMETHEUS :
429431 exportedUris .add (buildPrometheusExporterURI (kafkaConfig ));
430432 break ;
431- case TELEMETRY_EXPORTER_TYPE_OPS :
432- exportedUris .add (buildS3ExporterURI ());
433- break ;
434433 default :
435434 LOGGER .error ("illegal metrics exporter type: {}" , exporterType );
436435 break ;
@@ -439,13 +438,10 @@ private static String buildMetricsExporterUriFromLegacy(KafkaConfig kafkaConfig)
439438 }
440439
441440 if (kafkaConfig .getBoolean (S3_TELEMETRY_OPS_ENABLED_CONFIG )) {
442- exportedUris .add (buildS3ExporterURI ());
441+ exportedUris .add (buildOpsExporterURI ());
443442 }
444443
445- return exportedUris .stream ()
446- .filter (StringUtils ::isNotBlank )
447- .distinct ()
448- .collect (Collectors .joining ("," ));
444+ return String .join ("," , exportedUris );
449445 }
450446
451447 private static String buildOTLPExporterURI (KafkaConfig kafkaConfig ) {
@@ -472,7 +468,7 @@ private static String buildPrometheusExporterURI(KafkaConfig kafkaConfig) {
472468 "port" + "=" + kafkaConfig .getInt (S3_METRICS_EXPORTER_PROM_PORT_CONFIG );
473469 }
474470
475- private static String buildS3ExporterURI () {
471+ private static String buildOpsExporterURI () {
476472 return TELEMETRY_EXPORTER_TYPE_OPS + URI_DELIMITER ;
477473 }
478474
0 commit comments