Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.network.ConnectionMode;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.network.SocketServerConfigs;
Expand Down Expand Up @@ -282,7 +282,7 @@ public Map<Object, Object> buildConfig() {
}
if (_trustStore != null || _sslPort > 0) {
try {
props.putAll(TestSslUtils.createSslConfig(false, true, Mode.SERVER, _trustStore, "server" + _nodeId));
props.putAll(TestSslUtils.createSslConfig(false, true, ConnectionMode.SERVER, _trustStore, "server" + _nodeId));
// Switch interbroker to ssl
props.put(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.network.ConnectionMode;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.test.TestSslUtils;
Expand Down Expand Up @@ -58,7 +58,7 @@ protected void setSecurityConfigs(Properties clientProps, String certAlias) {
clientProps.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.name);
clientProps.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
try {
clientProps.putAll(TestSslUtils.createSslConfig(true, true, Mode.CLIENT, trustStoreFile, certAlias));
clientProps.putAll(TestSslUtils.createSslConfig(true, true, ConnectionMode.CLIENT, trustStoreFile, certAlias));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will break compilation with Kafka versions earlier than 3.9.0. However, it will still run with earlier versions since the changes are limited to the test code. We made a similar decision when we upgraded the Cruise Control code base to Kafka 3.8.0.

Unless we start migrating the Cruise Control code base off of non-public Kafka API usages, we will continue to encounter breaking changes every time we upgrade Kafka. This is due to the unstable and frequently changing nature of Kafka’s internal APIs.

As an alternative to this change here, we could implement our own createSslConfig method that avoids internal Kafka APIs and use that to generate the SSL configurations for tests. This would require a bit more effort but would move us closer to eliminating our dependence on internal Kafka APIs while preserving compatibility with Kafka 3.8.0.

} catch (Exception e) {
throw new IllegalStateException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.zookeeper.client.ZKClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -548,7 +548,7 @@ public static KafkaZkClient createKafkaZkClient(String connectString,
org.apache.kafka.common.utils.Time.class, String.class, ZKClientConfig.class,
String.class, String.class, boolean.class, boolean.class);
kafkaZkClient = (KafkaZkClient) kafka38PlusMet.invoke(null, connectString, zkSecurityEnabled, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT,
Integer.MAX_VALUE, new SystemTime(), zkClientName, zkClientConfig, metricGroup,
Integer.MAX_VALUE, Time.SYSTEM, zkClientName, zkClientConfig, metricGroup,
metricType, false, true);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
LOG.debug("Unable to find apply method in KafkaZkClient for Kafka 3.8+.", e);
Expand All @@ -560,7 +560,7 @@ Integer.MAX_VALUE, new SystemTime(), zkClientName, zkClientConfig, metricGroup,
org.apache.kafka.common.utils.Time.class, String.class, ZKClientConfig.class,
String.class, String.class, boolean.class);
kafkaZkClient = (KafkaZkClient) kafka31PlusMet.invoke(null, connectString, zkSecurityEnabled, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT,
Integer.MAX_VALUE, new SystemTime(), zkClientName, zkClientConfig, metricGroup,
Integer.MAX_VALUE, Time.SYSTEM, zkClientName, zkClientConfig, metricGroup,
metricType, false);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
LOG.debug("Unable to find apply method in KafkaZkClient for Kafka 3.1+.", e);
Expand All @@ -574,7 +574,7 @@ Integer.MAX_VALUE, new SystemTime(), zkClientName, zkClientConfig, metricGroup,
org.apache.kafka.common.utils.Time.class, String.class, String.class, Option.class,
Option.class);
kafkaZkClient = (KafkaZkClient) kafka31MinusMet.invoke(null, connectString, zkSecurityEnabled, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT,
Integer.MAX_VALUE, new SystemTime(), metricGroup, metricType, zkClientName, zkConfig);
Integer.MAX_VALUE, Time.SYSTEM, metricGroup, metricType, zkClientName, zkConfig);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
LOG.debug("Unable to find apply method in KafkaZkClient for Kafka 3.1-.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -172,7 +171,7 @@ public AnomalyDetectorManager(KafkaCruiseControl kafkaCruiseControl, Time time,
_selfHealingFixGenerationTimer = new HashMap<>();
cachedValues().forEach(anomalyType -> _selfHealingFixGenerationTimer.put(anomalyType, new Timer()));
// Add anomaly detector state
_anomalyDetectorState = new AnomalyDetectorState(new SystemTime(), _anomalyNotifier, 10, null);
_anomalyDetectorState = new AnomalyDetectorState(Time.SYSTEM, _anomalyNotifier, 10, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -81,7 +80,7 @@ public class SelfHealingNotifier implements AnomalyNotifier {
protected final Map<Boolean, Map<Integer, Long>> _latestFailedBrokersByAutoFixTriggered;

public SelfHealingNotifier() {
this(new SystemTime());
this(Time.SYSTEM);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.regex.Pattern;
import junit.framework.AssertionFailedError;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -106,7 +106,7 @@ private GoalOptimizer createGoalOptimizer(Properties overrideProps) {
props.putAll(overrideProps);
KafkaCruiseControlConfig config = new KafkaCruiseControlConfig(props);

return new GoalOptimizer(config, EasyMock.mock(LoadMonitor.class), new SystemTime(), new MetricRegistry(),
return new GoalOptimizer(config, EasyMock.mock(LoadMonitor.class), Time.SYSTEM, new MetricRegistry(),
EasyMock.mock(Executor.class), EasyMock.mock(AdminClient.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.EasyMock;


Expand Down Expand Up @@ -56,7 +56,7 @@ public static void main(String[] argv) throws Exception {
// Instantiate the components.
GoalOptimizer goalOptimizer = new GoalOptimizer(config,
null,
new SystemTime(),
Time.SYSTEM,
new MetricRegistry(),
EasyMock.mock(Executor.class),
EasyMock.mock(AdminClient.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.EasyMock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -140,7 +140,7 @@ static boolean executeGoalsFor(BalancingConstraint constraint,
props.setProperty(AnalyzerConfig.TOPICS_EXCLUDED_FROM_PARTITION_MOVEMENT_CONFIG, stringJoiner.toString());
GoalOptimizer goalOptimizer = new GoalOptimizer(new KafkaCruiseControlConfig(constraint.setProps(props)),
null,
new SystemTime(),
Time.SYSTEM,
new MetricRegistry(),
EasyMock.mock(Executor.class),
EasyMock.mock(AdminClient.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.Set;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.EasyMock;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -112,7 +112,7 @@ public void testRackIdMapper() throws Exception {
_goalToTest.configure(kafkaCruiseControlConfig.mergedConfigValues());
GoalOptimizer goalOptimizer = new GoalOptimizer(kafkaCruiseControlConfig,
null,
new SystemTime(),
Time.SYSTEM,
new MetricRegistry(),
EasyMock.mock(Executor.class),
EasyMock.mock(AdminClient.class));
Expand Down Expand Up @@ -176,7 +176,7 @@ public void testWithoutRackIdMapper() throws Exception {
_goalToTest.configure(kafkaCruiseControlConfig.mergedConfigValues());
GoalOptimizer goalOptimizer = new GoalOptimizer(kafkaCruiseControlConfig,
null,
new SystemTime(),
Time.SYSTEM,
new MetricRegistry(),
EasyMock.mock(Executor.class),
EasyMock.mock(AdminClient.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.linkedin.kafka.cruisecontrol.monitor.ModelGeneration;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.EasyMock;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -114,7 +114,7 @@ public void testRemoveDisks() throws KafkaCruiseControlException {
_goalToTest.configure(_kafkaCruiseControlConfig.mergedConfigValues());
GoalOptimizer goalOptimizer = new GoalOptimizer(_kafkaCruiseControlConfig,
null,
new SystemTime(),
Time.SYSTEM,
new MetricRegistry(),
EasyMock.mock(Executor.class),
EasyMock.mock(AdminClient.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.junit.BeforeClass;
import org.junit.Test;

Expand Down Expand Up @@ -65,7 +65,7 @@ public static void setup() {
Integer.toString(MOCK_DEFAULT_CONCURRENCY.get(ConcurrencyType.LEADERSHIP_BROKER)));
properties.put(ExecutorConfig.NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG,
Integer.toString(MOCK_DEFAULT_CONCURRENCY.get(ConcurrencyType.INTRA_BROKER_REPLICA)));
taskManager = new ExecutionTaskManager(null, new MetricRegistry(), new SystemTime(),
taskManager = new ExecutionTaskManager(null, new MetricRegistry(), Time.SYSTEM,
new KafkaCruiseControlConfig(properties));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.zookeeper.client.ZKClientConfig;
import org.easymock.Capture;
Expand Down Expand Up @@ -780,7 +779,7 @@ private void executeAndVerifyProposals(KafkaZkClient kafkaZkClient,
EasyMock.replay(mockUserTaskInfo, mockExecutorNotifier, mockLoadMonitor, mockAnomalyDetectorManager);
}
MetricRegistry metricRegistry = new MetricRegistry();
Executor executor = new Executor(configs, new SystemTime(), metricRegistry, null, mockExecutorNotifier,
Executor executor = new Executor(configs, Time.SYSTEM, metricRegistry, null, mockExecutorNotifier,
mockAnomalyDetectorManager);
executor.setUserTaskManager(mockUserTaskManager);
Map<TopicPartition, Integer> replicationFactors = new HashMap<>();
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ org.gradle.daemon=false
org.gradle.parallel=false
org.gradle.jvmargs=-Xms512m -Xmx512m
scalaVersion=2.13.13
kafkaVersion=3.8.0
kafkaVersion=3.9.0
zookeeperVersion=3.9.3
nettyVersion=4.1.114.Final
jettyVersion=9.4.56.v20240826
Expand Down