Skip to content

Commit 804f3a9

Browse files
committed
Kafka 4.1.0 removes kafka.cluster.EndPoint
1 parent 709254d commit 804f3a9

File tree

1 file changed

+11
-14
lines changed
  • smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test

1 file changed

+11
-14
lines changed

smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/EmbeddedKafkaBroker.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.apache.kafka.storage.internals.log.CleanerConfig;
4141
import org.jboss.logging.Logger;
4242

43-
import kafka.cluster.EndPoint;
4443
import kafka.server.KafkaConfig;
4544
import kafka.server.KafkaRaftServer;
4645
import scala.jdk.javaapi.StreamConverters;
@@ -254,7 +253,7 @@ public String getAdvertisedListeners() {
254253
}
255254

256255
public List<String> getLogDirs() {
257-
return getLogDirs(config);
256+
return config.logDirs();
258257
}
259258

260259
public int getNodeId() {
@@ -283,7 +282,7 @@ public static Endpoint endpoint(String listener, SecurityProtocol protocol, Stri
283282

284283
public static Endpoint parseEndpoint(SecurityProtocol protocol, String listenerStr) {
285284
Endpoint endpoint = parseEndpoint(listenerStr);
286-
return new Endpoint(endpoint.listenerName().orElse(protocol.name), protocol, endpoint.host(), endpoint.port());
285+
return new Endpoint(listenerName(endpoint), protocol, endpoint.host(), endpoint.port());
287286
}
288287

289288
public static Endpoint parseEndpoint(String listenerStr) {
@@ -311,7 +310,7 @@ public static Properties createDefaultBrokerConfig(int nodeId, Endpoint controll
311310

312311
// Configure listeners
313312
Map<String, Endpoint> listeners = advertisedListeners.stream()
314-
.map(l -> new Endpoint(l.listenerName().orElse(null), l.securityProtocol(), "", l.port()))
313+
.map(l -> new Endpoint(l.listener(), l.securityProtocol(), "", l.port()))
315314
.collect(Collectors.toMap(EmbeddedKafkaBroker::listenerName, Function.identity()));
316315
listeners.put(listenerName(controller), controller);
317316
listeners.put(listenerName(internalEndpoint), internalEndpoint);
@@ -372,7 +371,7 @@ public static KafkaConfig formatStorageFromConfig(Properties properties, String
372371
.setNodeId(config.nodeId())
373372
.setUnstableFeatureVersionsEnabled(config.unstableFeatureVersionsEnabled())
374373
.setIgnoreFormatted(ignoreFormatted)
375-
.setControllerListenerName(config.controllerListenerNames().head())
374+
.setControllerListenerName(config.controllerListenerNames().get(0))
376375
.setMetadataLogDirectory(config.metadataLogDir());
377376
configToLogDirectories(config).forEach(formatter::addDirectory);
378377
try {
@@ -384,8 +383,7 @@ public static KafkaConfig formatStorageFromConfig(Properties properties, String
384383
}
385384

386385
static Set<String> configToLogDirectories(KafkaConfig config) {
387-
TreeSet<String> dirs = new TreeSet<>();
388-
config.logDirs().foreach(dirs::add);
386+
TreeSet<String> dirs = new TreeSet<>(config.logDirs());
389387
String metadataLogDir = config.metadataLogDir();
390388
if (metadataLogDir != null) {
391389
dirs.add(metadataLogDir);
@@ -422,15 +420,10 @@ public static KafkaRaftServer createServer(final KafkaConfig config) {
422420

423421
private static String getAdvertisedListeners(KafkaConfig config) {
424422
return StreamConverters.asJavaParStream(config.effectiveAdvertisedBrokerListeners())
425-
.map(EndPoint::connectionString)
423+
.map(EmbeddedKafkaBroker::toListenerString)
426424
.collect(Collectors.joining(","));
427425
}
428426

429-
private static List<String> getLogDirs(KafkaConfig config) {
430-
return StreamConverters.asJavaParStream(config.logDirs())
431-
.collect(Collectors.toList());
432-
}
433-
434427
private static int getUnusedPort(int port) {
435428
if (port != 0) {
436429
return port;
@@ -464,7 +457,11 @@ private static String toProtocolMap(Endpoint endpoint) {
464457
}
465458

466459
private static String listenerName(Endpoint endpoint) {
467-
return endpoint.listenerName().orElse(endpoint.securityProtocol().name);
460+
String listener = endpoint.listener();
461+
if (listener != null && !listener.isBlank()) {
462+
return listener;
463+
}
464+
return endpoint.securityProtocol().name;
468465
}
469466

470467
public static class LoggingOutputStream extends java.io.OutputStream {

0 commit comments

Comments
 (0)