Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Cruise Control for Apache Kafka
* Metric anomaly detection
* Disk failure detection (not available in `kafka_0_11_and_1_0` branch)
* Slow broker detection (not available in `kafka_0_11_and_1_0` branch)
* Intra-Broker goal violation

* Admin operations, including:
* Add brokers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void close() {
static String getBootstrapServers(Map<String, ?> configs) {
Object port = configs.get("port");
String listeners = String.valueOf(configs.get("listeners"));
if (!"null".equals(listeners) && listeners.length() != 0) {
if (!"null".equals(listeners) && !listeners.isEmpty()) {
// See https://kafka.apache.org/documentation/#listeners for possible responses. If multiple listeners are configured, this function
// picks the first listener in the list of listeners. Hence, users of this config must adjust their order accordingly.
String firstListener = listeners.split("\\s*,\\s*")[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,25 @@ public ClusterModel clusterModel(ModelCompletenessRequirements requirements,
return _loadMonitor.clusterModel(timeMs(), requirements, allowCapacityEstimation, operationProgress);
}

/**
* Get the cluster model cutting off at the current timestamp with replica placement info.
* @param requirements the model completeness requirements.
* @param allowCapacityEstimation whether allow capacity estimation in cluster model if the underlying live broker capacity is unavailable.
* @param operationProgress the progress of the job to report.
* @param populateReplicaPlacementInfo whether populate replica placement information.
* @return The cluster workload model.
* @throws NotEnoughValidWindowsException If there is not enough sample to generate cluster model.
* @throws TimeoutException If broker capacity resolver is unable to resolve broker capacity in time.
* @throws BrokerCapacityResolutionException If broker capacity resolver fails to resolve broker capacity.
*/
public ClusterModel clusterModel(ModelCompletenessRequirements requirements,
boolean allowCapacityEstimation,
OperationProgress operationProgress,
boolean populateReplicaPlacementInfo)
throws NotEnoughValidWindowsException, TimeoutException, BrokerCapacityResolutionException {
return _loadMonitor.clusterModel(timeMs(), requirements, populateReplicaPlacementInfo, allowCapacityEstimation, operationProgress);
}

/**
* Get the cluster model for a given time window.
* @param from the start time of the window
Expand Down Expand Up @@ -684,7 +703,7 @@ public void executeProposals(Set<ExecutionProposal> proposals,
* Execute the given balancing proposals for remove operations.
* @param proposals the given balancing proposals
* @param throttleDecommissionedBroker Whether throttle the brokers that are being decommissioned.
* @param removedBrokers Brokers to be removed, null if no brokers has been removed.
* @param removedBrokers Brokers to be removed, null if no brokers have been removed.
* @param isKafkaAssignerMode {@code true} if kafka assigner mode, {@code false} otherwise.
* @param concurrentInterBrokerPartitionMovements The maximum number of concurrent inter-broker partition movements per broker
* (if null, use num.concurrent.partition.movements.per.broker).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ public static List<Goal> getDefaultGoalsByPriority(KafkaCruiseControlConfig conf
return config.getConfiguredInstances(AnalyzerConfig.DEFAULT_GOALS_CONFIG, Goal.class);
}

/**
* @param config The configurations for Cruise Control.
* @return The list of intra broker goals sorted by highest to lowest default priority.
*/
public static List<Goal> getIntraBrokerGoalsByPriority(KafkaCruiseControlConfig config) {
return config.getConfiguredInstances(AnalyzerConfig.INTRA_BROKER_GOALS_CONFIG, Goal.class);
}

/**
* @param config The configurations for Cruise Control.
* @return A goal map with goal name as the keys.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class GoalOptimizer implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(GoalOptimizer.class);
private static final long HALF_MINUTE_IN_MS = TimeUnit.SECONDS.toMillis(30);
private final List<Goal> _goalsByPriority;
private final List<Goal> _intraBrokerGoals;
private final BalancingConstraint _balancingConstraint;
private final Pattern _defaultExcludedTopics;
private final LoadMonitor _loadMonitor;
Expand Down Expand Up @@ -104,6 +105,7 @@ public GoalOptimizer(KafkaCruiseControlConfig config,
Executor executor,
AdminClient adminClient) {
_goalsByPriority = AnalyzerUtils.getDefaultGoalsByPriority(config);
_intraBrokerGoals = AnalyzerUtils.getIntraBrokerGoalsByPriority(config);
_defaultModelCompletenessRequirements = MonitorUtils.combineLoadRequirementOptions(_goalsByPriority);
_requirementsWithAvailableValidWindows = new ModelCompletenessRequirements(
1,
Expand All @@ -127,7 +129,7 @@ public GoalOptimizer(KafkaCruiseControlConfig config,
_proposalPrecomputingProgress = new OperationProgress();
_proposalComputationTimer = dropwizardMetricRegistry.timer(MetricRegistry.name(GOAL_OPTIMIZER_SENSOR, "proposal-computation-timer"));

// The cluster is identified as unfixable if combined goals can not be fixed
// The cluster is identified as unfixable if combined goals cannot be fixed
dropwizardMetricRegistry.register(MetricRegistry.name(GOAL_OPTIMIZER_SENSOR, "has-unfixable-proposal-optimization"),
(Gauge<Integer>) () -> hasUnfixableProposalOptimization() ? 1 : 0);
_hasUnfixableProposalOptimization = false;
Expand All @@ -151,7 +153,7 @@ private boolean hasUnfixableProposalOptimization() {

@Override
public void run() {
// We need to get this thread so it can be interrupted if the cached proposal has been invalidated.
// We need to get this thread, so it can be interrupted if the cached proposal has been invalidated.
_proposalPrecomputingSchedulerThread = Thread.currentThread();
LOG.info("Starting proposal candidate computation.");
while (!_shutdown && _numPrecomputingThreads > 0) {
Expand Down Expand Up @@ -545,6 +547,25 @@ public Set<String> excludedTopics(ClusterModel clusterModel, Pattern requestedEx
return Utils.getTopicNamesMatchedWithPattern(topicsToExclude, clusterModel::topics);
}

/**
* Checks if the list of Goals includes {@link #_intraBrokerGoals}
* @param goals The list of goals to look in
* @return return true if the list of goals contains an Intra Broker Goal
*/
protected boolean containsIntraBrokerGoal(List<Goal> goals) {
boolean result = false;

HashSet<Goal> intraBrokerGoals = new HashSet<>(_intraBrokerGoals);

for (Goal goal : goals) {
if (intraBrokerGoals.contains(goal)) {
result = true;
break;
}
}
return result;
}

/**
* Get the broker set resolver
* @return the configured BrokerSetResolver instance
Expand Down Expand Up @@ -600,7 +621,12 @@ public void run() {
// We compute the proposal even if there is not enough modeled partitions.
ModelCompletenessRequirements requirements = _loadMonitor.meetCompletenessRequirements(_defaultModelCompletenessRequirements)
? _defaultModelCompletenessRequirements : _requirementsWithAvailableValidWindows;
ClusterModel clusterModel = _loadMonitor.clusterModel(_time.milliseconds(), requirements, _allowCapacityEstimation, operationProgress);
// We check for Intra broker goals among Default goals - if we have intra broker goals, set populateReplicaPlacementInfo to true
ClusterModel clusterModel = _loadMonitor.clusterModel(_time.milliseconds(),
requirements,
containsIntraBrokerGoal(_goalsByPriority),
_allowCapacityEstimation,
operationProgress);
if (!clusterModel.topics().isEmpty()) {
OptimizerResult result = optimizations(clusterModel, _goalsByPriority, operationProgress);
LOG.debug("Generated a proposal candidate in {} ms.", _time.milliseconds() - startMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ protected void rebalanceForBroker(Broker broker,
OptimizationOptions optimizationOptions) {
double upperLimit = _balanceUpperThresholdByBroker.get(broker);
double lowerLimit = _balanceLowerThresholdByBroker.get(broker);
LOG.debug("balancing broker {}", broker);
LOG.debug("List of broker disks is {}.", broker.disks());
for (Disk disk : broker.disks()) {
if (!disk.isAlive()) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand Down Expand Up @@ -153,6 +155,7 @@ public class BrokerCapacityConfigFileResolver implements BrokerCapacityConfigRes
public static final double DEFAULT_CPU_CAPACITY_WITH_CORES = 100.0;
private static Map<Integer, BrokerCapacityInfo> capacitiesForBrokers;
private String _configFile;
private static final Logger LOG = LoggerFactory.getLogger(BrokerCapacityConfigFileResolver.class);

@Override
public void configure(Map<String, ?> configs) {
Expand Down Expand Up @@ -188,6 +191,19 @@ public BrokerCapacityInfo capacityForBroker(String rack, String host, int broker
}
}

@Override
public boolean isJbodKafkaCluster() {
// If any of the brokers in the cluster are using JBOD, the cluster is considered to be using JBOD
for (Map.Entry<Integer, BrokerCapacityInfo> entry : capacitiesForBrokers.entrySet()) {
if (entry.getValue().diskCapacityByLogDir() != null && entry.getValue().diskCapacityByLogDir().size() > 1) {
LOG.info("Kafka Cluster is considered as a JBOD type");
return true;
}
}
LOG.info("Kafka Cluster is considered as a non-JBOD type");
return false;
}

private static boolean isJBOD(Map<Resource, Object> brokerCapacity) {
return brokerCapacity.get(Resource.DISK) instanceof Map;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ public interface BrokerCapacityConfigResolver extends CruiseControlConfigurable,
*/
BrokerCapacityInfo capacityForBroker(String rack, String host, int brokerId, long timeoutMs, boolean allowCapacityEstimation)
throws TimeoutException, BrokerCapacityResolutionException;
boolean isJbodKafkaCluster();
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ private void sanityCheckGoalNames() {
AnomalyDetectorConfig.SELF_HEALING_GOALS_CONFIG, selfHealingGoalNames,
AnalyzerConfig.DEFAULT_GOALS_CONFIG, defaultGoalNames));
}

// Ensure that intra-broker goals used for self-healing are contained in intra broker goals.
List<String> selfHealingIntraBrokerGoalNames = getList(AnomalyDetectorConfig.SELF_HEALING_INTRA_BROKER_GOALS_CONFIG);
if (selfHealingIntraBrokerGoalNames.stream().anyMatch(g -> !intraBrokerGoalNames.contains(g))) {
throw new ConfigException(String.format("Attempt to configure self healing goals with unsupported goals (%s:%s and %s:%s).",
AnomalyDetectorConfig.SELF_HEALING_INTRA_BROKER_GOALS_CONFIG, selfHealingIntraBrokerGoalNames,
AnalyzerConfig.INTRA_BROKER_GOALS_CONFIG, intraBrokerGoalNames));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,10 @@ public final class AnalyzerConfig {
* <code>intra.broker.goals</code>
*/
public static final String INTRA_BROKER_GOALS_CONFIG = "intra.broker.goals";
public static final String DEFAULT_INTRA_BROKER_GOALS = new StringJoiner(",").add(IntraBrokerDiskCapacityGoal.class.getName())
.add(IntraBrokerDiskUsageDistributionGoal.class.getName()).toString();
public static final String DEFAULT_INTRA_BROKER_GOALS = new StringJoiner(",")
.add(IntraBrokerDiskCapacityGoal.class.getName())
.add(IntraBrokerDiskUsageDistributionGoal.class.getName())
.toString();
public static final String INTRA_BROKER_GOALS_DOC = "A list of case insensitive intra-broker goals in the order of priority. "
+ "The high priority goals will be executed first. The intra-broker goals are only relevant if intra-broker operation is "
+ "supported (i.e. in Cruise Control versions above 2.*), otherwise this list should be empty.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
package com.linkedin.kafka.cruisecontrol.config.constants;

import com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.IntraBrokerDiskCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.IntraBrokerDiskUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.MinTopicLeadersPerBrokerGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal;
import com.linkedin.kafka.cruisecontrol.detector.BasicProvisioner;
import com.linkedin.kafka.cruisecontrol.detector.BrokerFailures;
import com.linkedin.kafka.cruisecontrol.detector.DiskFailures;
import com.linkedin.kafka.cruisecontrol.detector.GoalViolations;
import com.linkedin.kafka.cruisecontrol.detector.IntraBrokerGoalViolations;
import com.linkedin.kafka.cruisecontrol.detector.KafkaMetricAnomaly;
import com.linkedin.kafka.cruisecontrol.detector.MaintenanceEvent;
import com.linkedin.kafka.cruisecontrol.detector.NoopMaintenanceEventReader;
Expand Down Expand Up @@ -65,6 +68,13 @@ public final class AnomalyDetectorConfig {
public static final String DEFAULT_GOAL_VIOLATIONS_CLASS = GoalViolations.class.getName();
public static final String GOAL_VIOLATIONS_CLASS_DOC = "The name of class that extends goal violations.";

/**
* <code>intra.broker.goal.violations.class</code>
*/
public static final String INTRA_BROKER_GOAL_VIOLATIONS_CLASS_CONFIG = "intra.broker.goal.violations.class";
public static final String DEFAULT_INTRA_BROKER_GOAL_VIOLATIONS_CLASS = IntraBrokerGoalViolations.class.getName();
public static final String INTRA_BROKER_GOAL_VIOLATIONS_CLASS_DOC = "The name of class that extends intra broker goal violations.";

/**
* <code>disk.failures.class</code>
*/
Expand All @@ -87,6 +97,15 @@ public final class AnomalyDetectorConfig {
public static final String SELF_HEALING_GOALS_DOC = "The list of goals to be used for self-healing relevant anomalies."
+ " If empty, uses the default.goals for self healing.";

/**
* <code>self.healing.intra.broker.goals</code>
*/
public static final String SELF_HEALING_INTRA_BROKER_GOALS_CONFIG = "self.healing.intra.broker.goals";
public static final List<String> DEFAULT_SELF_HEALING_INTRA_BROKER_GOALS = List.of(
IntraBrokerDiskCapacityGoal.class.getName(),
IntraBrokerDiskUsageDistributionGoal.class.getName());
public static final String SELF_HEALING_INTRA_BROKER_GOALS_DOC = "The list of intra broker goals to be used for self-healing relevant anomalies.";

/**
* <code>anomaly.notifier.class</code>
*/
Expand All @@ -98,12 +117,25 @@ public final class AnomalyDetectorConfig {
* <code>anomaly.detection.goals</code>
*/
public static final String ANOMALY_DETECTION_GOALS_CONFIG = "anomaly.detection.goals";
public static final String DEFAULT_ANOMALY_DETECTION_GOALS = new StringJoiner(",").add(RackAwareGoal.class.getName())
.add(MinTopicLeadersPerBrokerGoal.class.getName())
.add(ReplicaCapacityGoal.class.getName())
.add(DiskCapacityGoal.class.getName()).toString();
public static final String DEFAULT_ANOMALY_DETECTION_GOALS = new StringJoiner(",")
.add(RackAwareGoal.class.getName())
.add(MinTopicLeadersPerBrokerGoal.class.getName())
.add(ReplicaCapacityGoal.class.getName())
.add(DiskCapacityGoal.class.getName())
.toString();
public static final String ANOMALY_DETECTION_GOALS_DOC = "The goals that anomaly detector should detect if they are violated.";

/**
* <code>anomaly.detection.intra.broker.goals</code>
*/
public static final String ANOMALY_DETECTION_INTRA_BROKER_GOALS_CONFIG = "anomaly.detection.intra.broker.goals";
public static final String DEFAULT_ANOMALY_DETECTION_INTRA_BROKER_GOALS = new StringJoiner(",")
.add(IntraBrokerDiskCapacityGoal.class.getName())
.add(IntraBrokerDiskUsageDistributionGoal.class.getName())
.toString();
public static final String ANOMALY_DETECTION_INTRA_BROKER_GOALS_DOC = "The intra broker goals that anomaly detector "
+ "should detect if they are violated.";

/**
* <code>self.healing.exclude.recently.demoted.brokers</code>
*/
Expand Down Expand Up @@ -323,6 +355,11 @@ public static ConfigDef define(ConfigDef configDef) {
DEFAULT_GOAL_VIOLATIONS_CLASS,
ConfigDef.Importance.MEDIUM,
GOAL_VIOLATIONS_CLASS_DOC)
.define(INTRA_BROKER_GOAL_VIOLATIONS_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DEFAULT_INTRA_BROKER_GOAL_VIOLATIONS_CLASS,
ConfigDef.Importance.MEDIUM,
INTRA_BROKER_GOAL_VIOLATIONS_CLASS_DOC)
.define(DISK_FAILURES_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DEFAULT_DISK_FAILURES_CLASS,
Expand All @@ -338,6 +375,11 @@ public static ConfigDef define(ConfigDef configDef) {
DEFAULT_SELF_HEALING_GOALS,
ConfigDef.Importance.HIGH,
SELF_HEALING_GOALS_DOC)
.define(SELF_HEALING_INTRA_BROKER_GOALS_CONFIG,
ConfigDef.Type.LIST,
DEFAULT_SELF_HEALING_INTRA_BROKER_GOALS,
ConfigDef.Importance.HIGH,
SELF_HEALING_INTRA_BROKER_GOALS_DOC)
.define(ANOMALY_NOTIFIER_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DEFAULT_ANOMALY_NOTIFIER_CLASS,
Expand All @@ -348,6 +390,11 @@ public static ConfigDef define(ConfigDef configDef) {
DEFAULT_ANOMALY_DETECTION_GOALS,
ConfigDef.Importance.MEDIUM,
ANOMALY_DETECTION_GOALS_DOC)
.define(ANOMALY_DETECTION_INTRA_BROKER_GOALS_CONFIG,
ConfigDef.Type.LIST,
DEFAULT_ANOMALY_DETECTION_INTRA_BROKER_GOALS,
ConfigDef.Importance.MEDIUM,
ANOMALY_DETECTION_INTRA_BROKER_GOALS_DOC)
.define(SELF_HEALING_EXCLUDE_RECENTLY_DEMOTED_BROKERS_CONFIG,
ConfigDef.Type.BOOLEAN,
DEFAULT_SELF_HEALING_EXCLUDE_RECENT_BROKERS_CONFIG,
Expand Down
Loading