4040import org .apache .kafka .storage .internals .log .CleanerConfig ;
4141import org .jboss .logging .Logger ;
4242
43- import kafka .cluster .EndPoint ;
4443import kafka .server .KafkaConfig ;
4544import kafka .server .KafkaRaftServer ;
4645import scala .jdk .javaapi .StreamConverters ;
@@ -254,7 +253,7 @@ public String getAdvertisedListeners() {
254253 }
255254
256255 public List <String > getLogDirs () {
257- return getLogDirs ( config );
256+ return config . logDirs ( );
258257 }
259258
260259 public int getNodeId () {
@@ -283,7 +282,7 @@ public static Endpoint endpoint(String listener, SecurityProtocol protocol, Stri
283282
284283 public static Endpoint parseEndpoint (SecurityProtocol protocol , String listenerStr ) {
285284 Endpoint endpoint = parseEndpoint (listenerStr );
286- return new Endpoint (endpoint . listenerName (). orElse ( protocol . name ), protocol , endpoint .host (), endpoint .port ());
285+ return new Endpoint (listenerName (endpoint ), protocol , endpoint .host (), endpoint .port ());
287286 }
288287
289288 public static Endpoint parseEndpoint (String listenerStr ) {
@@ -311,7 +310,7 @@ public static Properties createDefaultBrokerConfig(int nodeId, Endpoint controll
311310
312311 // Configure listeners
313312 Map <String , Endpoint > listeners = advertisedListeners .stream ()
314- .map (l -> new Endpoint (l .listenerName (). orElse ( null ), l .securityProtocol (), "" , l .port ()))
313+ .map (l -> new Endpoint (l .listener ( ), l .securityProtocol (), "" , l .port ()))
315314 .collect (Collectors .toMap (EmbeddedKafkaBroker ::listenerName , Function .identity ()));
316315 listeners .put (listenerName (controller ), controller );
317316 listeners .put (listenerName (internalEndpoint ), internalEndpoint );
@@ -372,7 +371,7 @@ public static KafkaConfig formatStorageFromConfig(Properties properties, String
372371 .setNodeId (config .nodeId ())
373372 .setUnstableFeatureVersionsEnabled (config .unstableFeatureVersionsEnabled ())
374373 .setIgnoreFormatted (ignoreFormatted )
375- .setControllerListenerName (config .controllerListenerNames ().head ( ))
374+ .setControllerListenerName (config .controllerListenerNames ().get ( 0 ))
376375 .setMetadataLogDirectory (config .metadataLogDir ());
377376 configToLogDirectories (config ).forEach (formatter ::addDirectory );
378377 try {
@@ -384,8 +383,7 @@ public static KafkaConfig formatStorageFromConfig(Properties properties, String
384383 }
385384
386385 static Set <String > configToLogDirectories (KafkaConfig config ) {
387- TreeSet <String > dirs = new TreeSet <>();
388- config .logDirs ().foreach (dirs ::add );
386+ TreeSet <String > dirs = new TreeSet <>(config .logDirs ());
389387 String metadataLogDir = config .metadataLogDir ();
390388 if (metadataLogDir != null ) {
391389 dirs .add (metadataLogDir );
@@ -422,15 +420,10 @@ public static KafkaRaftServer createServer(final KafkaConfig config) {
422420
423421 private static String getAdvertisedListeners (KafkaConfig config ) {
424422 return StreamConverters .asJavaParStream (config .effectiveAdvertisedBrokerListeners ())
425- .map (EndPoint :: connectionString )
423+ .map (EmbeddedKafkaBroker :: toListenerString )
426424 .collect (Collectors .joining ("," ));
427425 }
428426
429- private static List <String > getLogDirs (KafkaConfig config ) {
430- return StreamConverters .asJavaParStream (config .logDirs ())
431- .collect (Collectors .toList ());
432- }
433-
434427 private static int getUnusedPort (int port ) {
435428 if (port != 0 ) {
436429 return port ;
@@ -464,7 +457,11 @@ private static String toProtocolMap(Endpoint endpoint) {
464457 }
465458
466459 private static String listenerName (Endpoint endpoint ) {
467- return endpoint .listenerName ().orElse (endpoint .securityProtocol ().name );
460+ String listener = endpoint .listener ();
461+ if (listener != null && !listener .isBlank ()) {
462+ return listener ;
463+ }
464+ return endpoint .securityProtocol ().name ;
468465 }
469466
470467 public static class LoggingOutputStream extends java .io .OutputStream {
0 commit comments