diff --git a/resolwe/flow/managers/workload_connectors/kubernetes.py b/resolwe/flow/managers/workload_connectors/kubernetes.py index 049da1ba7..01b2a46f1 100644 --- a/resolwe/flow/managers/workload_connectors/kubernetes.py +++ b/resolwe/flow/managers/workload_connectors/kubernetes.py @@ -619,6 +619,118 @@ def optimize_job_scheduling(self, data: Data, job_description: dict): } } + def append_node_affinity(self, job_description: dict, process_requirements: dict): + """ + Appends nodeAffinity based on CPU and memory ratio to an existing job description if instance_types are defined + within FLOW_KUBERNETES_AFFINITY. + + :param job_description: dict containing the job spec where the affinity will be appended. + :param process_requirements: dict containing 'cpu' and 'memory' requirements of the process. + :return: None (modifies job_description in place). + """ + + # Extract kubernetes_affinity and instance_types from settings + if kubernetes_affinity := getattr(settings, "FLOW_KUBERNETES_AFFINITY", None): + instance_types = kubernetes_affinity.get("instance_types", {}) + else: + instance_types = {} + + # Proceed only if instance_types is not empty + if not instance_types: + return + + # Consider a batch of ten same-sized processes with requirements 8 vCPU and 34 GB. + # non-discrete strategy would allocate it to general-purpose nodes, + # because the ratio is 4.25. This makes these nodes CPU under-utilized, costing $8/h. + # Discrete strategy will use memory-optimized nodes, costing $5.3/h. + # Set this to False if you expect processes with < 3.75 ratio at least 70% + # as frequently as there are 4.25 ratio, so the rest of the CPUs can be utilized. + use_discrete_instance_strategy = True + + best_match_category = None + lowest_process_hourly_rate = float('inf') + + process_cpu = process_requirements['cpu'] + process_memory = process_requirements['memory'] + # Don't invert this, it's not equivalent. The midpoint in geometric sequence is + # not the same as in linear sequence: 6=4+8/2, but 1/6!=(1/4+1/8)/2 + process_ratio = process_memory / process_cpu + + # Sort instance types by hourly_rate to prefer cheaper options. + # This should avoid spawning 128GB instance type for a single 32GB + # process, but support larger instances when needed. This is not fully + # optimized if smaller instance types are available, since it would + # create a lot of instance fragmentation. Smaller nodes should have + # at least 8 cpus in this configuration. + sorted_instance_types = sorted(instance_types.items(), key=lambda x: x[1].get("hourly_rate", float('inf'))) + + for instance_type, attributes in sorted_instance_types: + instance_cpu = int(attributes['cpu']) + instance_memory = int(attributes['memory']) + instance_category = attributes['instance-category'] + instance_hourly_rate = float(attributes['hourly_rate']) + + # Skip instance types that don't meet the process requirements + # This needs to be here, since some instance category may not have + # a sufficiently large instance type available. + if instance_cpu < process_cpu or instance_memory < process_memory: + continue + + # Calculate ratio of CPU to memory for the process and the instance + instance_ratio = instance_memory / instance_cpu + + # How many processes of this type would fit on the instance? + if use_discrete_instance_strategy: + process_instance_density = min(instance_cpu//process_cpu, instance_memory//process_memory) + # In practice this should be equivalent to: + # compute-optimized for ratio <= 2.0 + # general purpose for 2.0 < ratio <= 4.0 + # memory-optimized for ratio > 4.0 + + else: + process_instance_density = min(instance_cpu/process_cpu, instance_memory/process_memory) + # In practice this should be equivalent to: + # compute-optimized for ratio <~ 2.25 + # general purpose for 2.25 <~ ratio <= 5.25 + # memory-optimized for ratio > 5.25 + + process_hourly_rate = instance_hourly_rate / process_instance_density + + if process_hourly_rate < lowest_process_hourly_rate: + lowest_process_hourly_rate = process_hourly_rate + best_match_category = instance_category + + if not best_match_category: + raise ValueError("No suitable instance type found for the given process requirements.") + + # Create the new match expression + new_match_expression = { + "key": "instance-category", + "operator": "In", + "values": [best_match_category] + } + + # Ensure nodeAffinity structure exists in job_description + if "affinity" not in job_description: + job_description["affinity"] = {} + + if "nodeAffinity" not in job_description["affinity"]: + job_description["affinity"]["nodeAffinity"] = { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [] + } + } + + node_affinity = job_description["affinity"]["nodeAffinity"] + node_selector_terms = node_affinity["requiredDuringSchedulingIgnoredDuringExecution"]["nodeSelectorTerms"] + + # If there are no existing terms, create one + if not node_selector_terms: + node_selector_terms.append({"matchExpressions": []}) + + # Append the new match expression to the first nodeSelectorTerm's matchExpressions + node_selector_terms[0]["matchExpressions"].append(new_match_expression) + def _get_overcommit_factors(self, data: Data) -> dict: """Get the overcommit settings for CPU and memory. @@ -680,6 +792,11 @@ def start(self, data: Data, listener_connection: Tuple[str, str, str]): requests["cpu"] = limits["cores"] * overcommit_factors["cpu"] limits["cpu"] = limits.pop("cores") + 1 + process_requirements = { + "cpu": limits["cpu"], + "memory": limits["memory"] + } + # The memory in the database is stored in megabytes but the kubertenes # requires memory in bytes. # We request less memory than stored in the database and set limit at 10% more @@ -867,6 +984,19 @@ def start(self, data: Data, listener_connection: Tuple[str, str, str]): }, } self.optimize_job_scheduling(data, job_description) + try: + self.append_node_affinity(job_description, process_requirements) + except Exception as e: + # This operation is not critical for the job to run, so we log the + # error and continue. The job will still run, but the node affinity + # may be sub-optimal. + logger.exception( + __( + "Kubernetes instance category affinity assignemt failed: '{}'.", + error, + ) + ) + start_time = time.time() processing_name = constants.PROCESSING_VOLUME_NAME