Skip to content

Commit

Permalink
WIP: Add affinity for instance category to job
Browse files Browse the repository at this point in the history
  • Loading branch information
acopar committed Sep 5, 2024
1 parent 5e5034f commit abf645d
Showing 1 changed file with 130 additions and 0 deletions.
130 changes: 130 additions & 0 deletions resolwe/flow/managers/workload_connectors/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,118 @@ def optimize_job_scheduling(self, data: Data, job_description: dict):
}
}

def append_node_affinity(self, job_description: dict, process_requirements: dict):
"""
Appends nodeAffinity based on CPU and memory ratio to an existing job description if instance_types are defined
within FLOW_KUBERNETES_AFFINITY.
:param job_description: dict containing the job spec where the affinity will be appended.
:param process_requirements: dict containing 'cpu' and 'memory' requirements of the process.
:return: None (modifies job_description in place).
"""

# Extract kubernetes_affinity and instance_types from settings
if kubernetes_affinity := getattr(settings, "FLOW_KUBERNETES_AFFINITY", None):
instance_types = kubernetes_affinity.get("instance_types", {})
else:
instance_types = {}

# Proceed only if instance_types is not empty
if not instance_types:
return

# Consider a batch of ten same-sized processes with requirements 8 vCPU and 34 GB.
# non-discrete strategy would allocate it to general-purpose nodes,
# because the ratio is 4.25. This makes these nodes CPU under-utilized, costing $8/h.
# Discrete strategy will use memory-optimized nodes, costing $5.3/h.
# Set this to False if you expect processes with < 3.75 ratio at least 70%
# as frequently as there are 4.25 ratio, so the rest of the CPUs can be utilized.
use_discrete_instance_strategy = True

best_match_category = None
lowest_process_hourly_rate = float('inf')

process_cpu = process_requirements['cpu']
process_memory = process_requirements['memory']
# Don't invert this, it's not equivalent. The midpoint in geometric sequence is
# not the same as in linear sequence: 6=4+8/2, but 1/6!=(1/4+1/8)/2
process_ratio = process_memory / process_cpu

# Sort instance types by hourly_rate to prefer cheaper options.
# This should avoid spawning 128GB instance type for a single 32GB
# process, but support larger instances when needed. This is not fully
# optimized if smaller instance types are available, since it would
# create a lot of instance fragmentation. Smaller nodes should have
# at least 8 cpus in this configuration.
sorted_instance_types = sorted(instance_types.items(), key=lambda x: x[1].get("hourly_rate", float('inf')))

for instance_type, attributes in sorted_instance_types:
instance_cpu = int(attributes['cpu'])
instance_memory = int(attributes['memory'])
instance_category = attributes['instance-category']
instance_hourly_rate = float(attributes['hourly_rate'])

# Skip instance types that don't meet the process requirements
# This needs to be here, since some instance category may not have
# a sufficiently large instance type available.
if instance_cpu < process_cpu or instance_memory < process_memory:
continue

# Calculate ratio of CPU to memory for the process and the instance
instance_ratio = instance_memory / instance_cpu

# How many processes of this type would fit on the instance?
if use_discrete_instance_strategy:
process_instance_density = min(instance_cpu//process_cpu, instance_memory//process_memory)
# In practice this should be equivalent to:
# compute-optimized for ratio <= 2.0
# general purpose for 2.0 < ratio <= 4.0
# memory-optimized for ratio > 4.0

else:
process_instance_density = min(instance_cpu/process_cpu, instance_memory/process_memory)
# In practice this should be equivalent to:
# compute-optimized for ratio <~ 2.25
# general purpose for 2.25 <~ ratio <= 5.25
# memory-optimized for ratio > 5.25

process_hourly_rate = instance_hourly_rate / process_instance_density

if process_hourly_rate < lowest_process_hourly_rate:
lowest_process_hourly_rate = process_hourly_rate
best_match_category = instance_category

if not best_match_category:
raise ValueError("No suitable instance type found for the given process requirements.")

# Create the new match expression
new_match_expression = {
"key": "instance-category",
"operator": "In",
"values": [best_match_category]
}

# Ensure nodeAffinity structure exists in job_description
if "affinity" not in job_description:
job_description["affinity"] = {}

if "nodeAffinity" not in job_description["affinity"]:
job_description["affinity"]["nodeAffinity"] = {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": []
}
}

node_affinity = job_description["affinity"]["nodeAffinity"]
node_selector_terms = node_affinity["requiredDuringSchedulingIgnoredDuringExecution"]["nodeSelectorTerms"]

# If there are no existing terms, create one
if not node_selector_terms:
node_selector_terms.append({"matchExpressions": []})

# Append the new match expression to the first nodeSelectorTerm's matchExpressions
node_selector_terms[0]["matchExpressions"].append(new_match_expression)

def _get_overcommit_factors(self, data: Data) -> dict:
"""Get the overcommit settings for CPU and memory.
Expand Down Expand Up @@ -680,6 +792,11 @@ def start(self, data: Data, listener_connection: Tuple[str, str, str]):
requests["cpu"] = limits["cores"] * overcommit_factors["cpu"]
limits["cpu"] = limits.pop("cores") + 1

process_requirements = {
"cpu": limits["cpu"],
"memory": limits["memory"]
}

# The memory in the database is stored in megabytes but the kubertenes
# requires memory in bytes.
# We request less memory than stored in the database and set limit at 10% more
Expand Down Expand Up @@ -867,6 +984,19 @@ def start(self, data: Data, listener_connection: Tuple[str, str, str]):
},
}
self.optimize_job_scheduling(data, job_description)
try:
self.append_node_affinity(job_description, process_requirements)
except Exception as e:
# This operation is not critical for the job to run, so we log the
# error and continue. The job will still run, but the node affinity
# may be sub-optimal.
logger.exception(
__(
"Kubernetes instance category affinity assignemt failed: '{}'.",
error,
)
)

start_time = time.time()

processing_name = constants.PROCESSING_VOLUME_NAME
Expand Down

0 comments on commit abf645d

Please sign in to comment.