Skip to content

Commit b1ab9ca

Browse files
JvD-Ericssondanielgospodinow
authored andcommitted
Upgrade kafka to 3.9.1 (linkedin#2286)
1 parent bcb155b commit b1ab9ca

File tree

16 files changed

+419
-30
lines changed

16 files changed

+419
-30
lines changed

build.gradle

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,12 @@ project(':cruise-control') {
321321
api 'org.json:json:20231013'
322322
api 'org.xerial.snappy:snappy-java:1.1.10.5'
323323

324+
constraints {
325+
implementation("commons-beanutils:commons-beanutils:1.11.0") {
326+
because("version 1.9.4 pulled from kafa 3.9.1 has CVE-2025-48734 in it, which is fixed in 1.11.0")
327+
}
328+
}
329+
324330
testImplementation project(path: ':cruise-control-metrics-reporter', configuration: 'testOutput')
325331
testImplementation project(path: ':cruise-control-core', configuration: 'testOutput')
326332
testImplementation "org.scala-lang:scala-library:$scalaVersion"

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedBrokerBuilder.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import java.util.StringJoiner;
1111
import java.util.concurrent.atomic.AtomicInteger;
1212
import org.apache.kafka.common.config.SslConfigs;
13-
import org.apache.kafka.common.network.Mode;
1413
import org.apache.kafka.common.security.auth.SecurityProtocol;
1514
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
1615
import org.apache.kafka.network.SocketServerConfigs;
@@ -19,7 +18,6 @@
1918
import org.apache.kafka.server.config.ServerLogConfigs;
2019
import org.apache.kafka.server.config.ZkConfigs;
2120
import org.apache.kafka.storage.internals.log.CleanerConfig;
22-
import org.apache.kafka.test.TestSslUtils;
2321

2422

2523
public class CCEmbeddedBrokerBuilder {
@@ -282,7 +280,8 @@ public Map<Object, Object> buildConfig() {
282280
}
283281
if (_trustStore != null || _sslPort > 0) {
284282
try {
285-
props.putAll(TestSslUtils.createSslConfig(false, true, Mode.SERVER, _trustStore, "server" + _nodeId));
283+
props.putAll(CCSslTestUtils.createSslConfig(false, true,
284+
CCSslTestUtils.ConnectionMode.SERVER, _trustStore, "server" + _nodeId));
286285
// Switch interbroker to ssl
287286
props.put(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name);
288287
} catch (Exception e) {

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCKafkaClientsIntegrationTestHarness.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,8 @@
1111
import org.apache.kafka.clients.producer.Producer;
1212
import org.apache.kafka.clients.producer.KafkaProducer;
1313
import org.apache.kafka.common.config.SslConfigs;
14-
import org.apache.kafka.common.network.Mode;
1514
import org.apache.kafka.common.security.auth.SecurityProtocol;
1615
import org.apache.kafka.common.serialization.StringSerializer;
17-
import org.apache.kafka.test.TestSslUtils;
1816

1917

2018
public abstract class CCKafkaClientsIntegrationTestHarness extends CCKafkaIntegrationTestHarness {
@@ -58,7 +56,8 @@ protected void setSecurityConfigs(Properties clientProps, String certAlias) {
5856
clientProps.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.name);
5957
clientProps.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
6058
try {
61-
clientProps.putAll(TestSslUtils.createSslConfig(true, true, Mode.CLIENT, trustStoreFile, certAlias));
59+
clientProps.putAll(CCSslTestUtils.createSslConfig(true, true,
60+
CCSslTestUtils.ConnectionMode.CLIENT, trustStoreFile, certAlias));
6261
} catch (Exception e) {
6362
throw new IllegalStateException(e);
6463
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Copyright 2025 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").See License in the project root for license information.
3+
*/
4+
5+
package com.linkedin.kafka.cruisecontrol.metricsreporter.utils;
6+
7+
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCSslTestUtils.ConnectionMode;
8+
import org.apache.kafka.common.config.SslConfigs;
9+
import org.apache.kafka.common.config.types.Password;
10+
import org.bouncycastle.asn1.x500.X500Name;
11+
import javax.net.ssl.TrustManagerFactory;
12+
import java.io.File;
13+
import java.io.IOException;
14+
import java.security.GeneralSecurityException;
15+
import java.security.KeyPair;
16+
import java.security.cert.X509Certificate;
17+
import java.util.ArrayList;
18+
import java.util.HashMap;
19+
import java.util.List;
20+
import java.util.Locale;
21+
import java.util.Map;
22+
23+
import static com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCSslTestUtils.createKeyStore;
24+
import static com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCSslTestUtils.createTrustStore;
25+
import static com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCSslTestUtils.generate;
26+
import static com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCSslTestUtils.generateKeyPair;
27+
import static com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCSslTestUtils.tempFile;
28+
29+
public class CCSslConfigsBuilder {
30+
private final ConnectionMode _connectionMode;
31+
private String _tlsProtocol;
32+
private boolean _useClientCert;
33+
private boolean _createTrustStore;
34+
private File _trustStoreFile;
35+
private Password _trustStorePassword;
36+
private Password _keyStorePassword;
37+
private Password _keyPassword;
38+
private String _certAlias;
39+
private String _cn;
40+
private String _algorithm;
41+
42+
public CCSslConfigsBuilder(ConnectionMode connectionMode) {
43+
this._connectionMode = connectionMode;
44+
this._tlsProtocol = SslConfigs.DEFAULT_SSL_PROTOCOL;
45+
this._trustStorePassword = new Password("TrustStorePassword");
46+
this._keyStorePassword = connectionMode == CCSslTestUtils.ConnectionMode.SERVER ? new Password("ServerPassword")
47+
: new Password("ClientPassword");
48+
this._keyPassword = this._keyStorePassword;
49+
this._cn = "localhost";
50+
this._certAlias = connectionMode.name().toLowerCase(Locale.ROOT);
51+
this._algorithm = "RSA";
52+
this._createTrustStore = true;
53+
}
54+
55+
/**
56+
* Set trust store file and create a new trust store to true.
57+
*
58+
* @param trustStoreFile Trust store file to set.
59+
* @return This.
60+
*/
61+
public CCSslConfigsBuilder createNewTrustStore(File trustStoreFile) {
62+
this._trustStoreFile = trustStoreFile;
63+
this._createTrustStore = true;
64+
return this;
65+
}
66+
67+
/**
68+
* Set trust store file and create a new trust store to false.
69+
*
70+
* @param trustStoreFile The existing trust store file to use.
71+
* @return This.
72+
*/
73+
public CCSslConfigsBuilder useExistingTrustStore(File trustStoreFile) {
74+
this._trustStoreFile = trustStoreFile;
75+
this._createTrustStore = false;
76+
return this;
77+
}
78+
79+
/**
80+
* Set use client cert.
81+
*
82+
* @param useClientCert Whether to use a client certificate.
83+
* @return This.
84+
*/
85+
public CCSslConfigsBuilder useClientCert(boolean useClientCert) {
86+
this._useClientCert = useClientCert;
87+
return this;
88+
}
89+
90+
/**
91+
* Set use cert alias.
92+
*
93+
* @param certAlias The alias for the certificate in the trust store.
94+
* @return This.
95+
*/
96+
public CCSslConfigsBuilder certAlias(String certAlias) {
97+
this._certAlias = certAlias;
98+
return this;
99+
}
100+
101+
/**
102+
* Set cn.
103+
*
104+
* @param cn The common name (CN) for the certificate.
105+
* @return This.
106+
*/
107+
public CCSslConfigsBuilder cn(String cn) {
108+
this._cn = cn;
109+
return this;
110+
}
111+
112+
/**
113+
* Builds the Java KeyStore (JKS)-based SSL configuration.
114+
* Depending on the connection mode (CLIENT or SERVER), this method generates the required
115+
* key pairs and self-signed certificates, creates keystores, and optionally creates a trust store.
116+
*
117+
* @return A map of SSL configuration properties suitable for use with Kafka or other TLS-based systems.
118+
* @throws IOException If file operations fail during keystore or trust store creation.
119+
* @throws GeneralSecurityException If a cryptographic error occurs while generating keys or certificates.
120+
*/
121+
public Map<String, Object> buildJks() throws IOException, GeneralSecurityException {
122+
Map<String, X509Certificate> certs = new HashMap<>();
123+
File keyStoreFile = null;
124+
if (this._connectionMode == CCSslTestUtils.ConnectionMode.CLIENT && this._useClientCert) {
125+
keyStoreFile = tempFile("clientKS", ".jks");
126+
KeyPair cKP = generateKeyPair(this._algorithm);
127+
X509Certificate cCert = generate(new X500Name("CN=" + _cn + ", O=A client"), cKP);
128+
createKeyStore(keyStoreFile.getPath(), this._keyStorePassword, this._keyPassword, "client",
129+
cKP.getPrivate(), cCert);
130+
certs.put(this._certAlias, cCert);
131+
} else if (this._connectionMode == CCSslTestUtils.ConnectionMode.SERVER) {
132+
keyStoreFile = tempFile("serverKS", ".jks");
133+
KeyPair sKP = generateKeyPair(this._algorithm);
134+
X509Certificate sCert = generate(new X500Name("CN=" + _cn + ", O=A server"), sKP);
135+
createKeyStore(keyStoreFile.getPath(), this._keyStorePassword, this._keyPassword, "server",
136+
sKP.getPrivate(), sCert);
137+
certs.put(this._certAlias, sCert);
138+
keyStoreFile.deleteOnExit();
139+
}
140+
141+
if (this._createTrustStore) {
142+
createTrustStore(this._trustStoreFile.getPath(), this._trustStorePassword, certs);
143+
this._trustStoreFile.deleteOnExit();
144+
}
145+
146+
Map<String, Object> sslConfigs = new HashMap<>();
147+
sslConfigs.put("ssl.protocol", this._tlsProtocol);
148+
if (this._connectionMode == CCSslTestUtils.ConnectionMode.SERVER || this._connectionMode == CCSslTestUtils.ConnectionMode.CLIENT
149+
&& keyStoreFile != null) {
150+
sslConfigs.put("ssl.keystore.location", keyStoreFile.getPath());
151+
sslConfigs.put("ssl.keystore.type", "JKS");
152+
sslConfigs.put("ssl.keymanager.algorithm", TrustManagerFactory.getDefaultAlgorithm());
153+
sslConfigs.put("ssl.keystore.password", this._keyStorePassword);
154+
sslConfigs.put("ssl.key.password", this._keyPassword);
155+
}
156+
157+
sslConfigs.put("ssl.truststore.location", this._trustStoreFile.getPath());
158+
sslConfigs.put("ssl.truststore.password", this._trustStorePassword);
159+
sslConfigs.put("ssl.truststore.type", "JKS");
160+
sslConfigs.put("ssl.trustmanager.algorithm", TrustManagerFactory.getDefaultAlgorithm());
161+
List<String> enabledProtocols = new ArrayList<>();
162+
enabledProtocols.add(this._tlsProtocol);
163+
sslConfigs.put("ssl.enabled.protocols", enabledProtocols);
164+
return sslConfigs;
165+
}
166+
}

0 commit comments

Comments
 (0)