Skip to content

Commit 8b8dc62

Browse files
Bump kafka.version from 4.0.0 to 4.1.0 (#3141)
* Bump kafka.version from 4.0.0 to 4.1.0 Bumps `kafka.version` from 4.0.0 to 4.1.0. Updates `org.apache.kafka:kafka-clients` from 4.0.0 to 4.1.0 Updates `org.apache.kafka:kafka_2.13` from 4.0.0 to 4.1.0 Updates `org.apache.kafka:kafka-server-common` from 4.0.0 to 4.1.0 Updates `org.apache.kafka:kafka-metadata` from 4.0.0 to 4.1.0 Updates `org.apache.kafka:kafka-server` from 4.0.0 to 4.1.0 Updates `org.apache.kafka:kafka-raft` from 4.0.0 to 4.1.0 Updates `org.apache.kafka:kafka-group-coordinator` from 4.0.0 to 4.1.0 Updates `org.apache.kafka:kafka-transaction-coordinator` from 4.0.0 to 4.1.0 Updates `org.apache.kafka:kafka-storage` from 4.0.0 to 4.1.0 --- updated-dependencies: - dependency-name: org.apache.kafka:kafka-clients dependency-version: 4.1.0 dependency-type: direct:production update-type: version-update:semver-minor - dependency-name: org.apache.kafka:kafka_2.13 dependency-version: 4.1.0 dependency-type: direct:production update-type: version-update:semver-minor - dependency-name: org.apache.kafka:kafka-server-common dependency-version: 4.1.0 dependency-type: direct:production update-type: version-update:semver-minor - dependency-name: org.apache.kafka:kafka-metadata dependency-version: 4.1.0 dependency-type: direct:production update-type: version-update:semver-minor - dependency-name: org.apache.kafka:kafka-server dependency-version: 4.1.0 dependency-type: direct:production update-type: version-update:semver-minor - dependency-name: org.apache.kafka:kafka-raft dependency-version: 4.1.0 dependency-type: direct:production update-type: version-update:semver-minor - dependency-name: org.apache.kafka:kafka-group-coordinator dependency-version: 4.1.0 dependency-type: direct:production update-type: version-update:semver-minor - dependency-name: org.apache.kafka:kafka-transaction-coordinator dependency-version: 4.1.0 dependency-type: direct:production update-type: version-update:semver-minor - dependency-name: org.apache.kafka:kafka-storage dependency-version: 4.1.0 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <[email protected]> * Kafka 4.1.0 removes kafka.cluster.EndPoint --------- Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Ozan Gunalp <[email protected]>
1 parent 6eeb983 commit 8b8dc62

File tree

2 files changed

+12
-15
lines changed

2 files changed

+12
-15
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@
8787

8888
<jboss-log-manager.version>3.1.2.Final</jboss-log-manager.version>
8989

90-
<kafka.version>4.0.0</kafka.version>
90+
<kafka.version>4.1.0</kafka.version>
9191

9292
<opentelemetry.instrumentation.version>2.21.0</opentelemetry.instrumentation.version>
9393
<opentelemetry.version>1.55.0</opentelemetry.version>

smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/EmbeddedKafkaBroker.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.apache.kafka.storage.internals.log.CleanerConfig;
4141
import org.jboss.logging.Logger;
4242

43-
import kafka.cluster.EndPoint;
4443
import kafka.server.KafkaConfig;
4544
import kafka.server.KafkaRaftServer;
4645
import scala.jdk.javaapi.StreamConverters;
@@ -254,7 +253,7 @@ public String getAdvertisedListeners() {
254253
}
255254

256255
public List<String> getLogDirs() {
257-
return getLogDirs(config);
256+
return config.logDirs();
258257
}
259258

260259
public int getNodeId() {
@@ -283,7 +282,7 @@ public static Endpoint endpoint(String listener, SecurityProtocol protocol, Stri
283282

284283
public static Endpoint parseEndpoint(SecurityProtocol protocol, String listenerStr) {
285284
Endpoint endpoint = parseEndpoint(listenerStr);
286-
return new Endpoint(endpoint.listenerName().orElse(protocol.name), protocol, endpoint.host(), endpoint.port());
285+
return new Endpoint(listenerName(endpoint), protocol, endpoint.host(), endpoint.port());
287286
}
288287

289288
public static Endpoint parseEndpoint(String listenerStr) {
@@ -311,7 +310,7 @@ public static Properties createDefaultBrokerConfig(int nodeId, Endpoint controll
311310

312311
// Configure listeners
313312
Map<String, Endpoint> listeners = advertisedListeners.stream()
314-
.map(l -> new Endpoint(l.listenerName().orElse(null), l.securityProtocol(), "", l.port()))
313+
.map(l -> new Endpoint(l.listener(), l.securityProtocol(), "", l.port()))
315314
.collect(Collectors.toMap(EmbeddedKafkaBroker::listenerName, Function.identity()));
316315
listeners.put(listenerName(controller), controller);
317316
listeners.put(listenerName(internalEndpoint), internalEndpoint);
@@ -372,7 +371,7 @@ public static KafkaConfig formatStorageFromConfig(Properties properties, String
372371
.setNodeId(config.nodeId())
373372
.setUnstableFeatureVersionsEnabled(config.unstableFeatureVersionsEnabled())
374373
.setIgnoreFormatted(ignoreFormatted)
375-
.setControllerListenerName(config.controllerListenerNames().head())
374+
.setControllerListenerName(config.controllerListenerNames().get(0))
376375
.setMetadataLogDirectory(config.metadataLogDir());
377376
configToLogDirectories(config).forEach(formatter::addDirectory);
378377
try {
@@ -384,8 +383,7 @@ public static KafkaConfig formatStorageFromConfig(Properties properties, String
384383
}
385384

386385
static Set<String> configToLogDirectories(KafkaConfig config) {
387-
TreeSet<String> dirs = new TreeSet<>();
388-
config.logDirs().foreach(dirs::add);
386+
TreeSet<String> dirs = new TreeSet<>(config.logDirs());
389387
String metadataLogDir = config.metadataLogDir();
390388
if (metadataLogDir != null) {
391389
dirs.add(metadataLogDir);
@@ -422,15 +420,10 @@ public static KafkaRaftServer createServer(final KafkaConfig config) {
422420

423421
private static String getAdvertisedListeners(KafkaConfig config) {
424422
return StreamConverters.asJavaParStream(config.effectiveAdvertisedBrokerListeners())
425-
.map(EndPoint::connectionString)
423+
.map(EmbeddedKafkaBroker::toListenerString)
426424
.collect(Collectors.joining(","));
427425
}
428426

429-
private static List<String> getLogDirs(KafkaConfig config) {
430-
return StreamConverters.asJavaParStream(config.logDirs())
431-
.collect(Collectors.toList());
432-
}
433-
434427
private static int getUnusedPort(int port) {
435428
if (port != 0) {
436429
return port;
@@ -464,7 +457,11 @@ private static String toProtocolMap(Endpoint endpoint) {
464457
}
465458

466459
private static String listenerName(Endpoint endpoint) {
467-
return endpoint.listenerName().orElse(endpoint.securityProtocol().name);
460+
String listener = endpoint.listener();
461+
if (listener != null && !listener.isBlank()) {
462+
return listener;
463+
}
464+
return endpoint.securityProtocol().name;
468465
}
469466

470467
public static class LoggingOutputStream extends java.io.OutputStream {

0 commit comments

Comments
 (0)