Skip to content
Closed
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
b5fb017
Add a pipeline to test job scheduling delay
Feb 17, 2025
dfd527a
Add cl2 configs
Feb 19, 2025
afd28ed
Fixes typo
Feb 19, 2025
cb46bba
Runs in eastus
Feb 19, 2025
1db0d81
Add credential_type
Feb 25, 2025
0b3afd5
Update VMs to v5
Feb 28, 2025
46335a6
Update vm sizes based on availability
Feb 28, 2025
6073fee
Bug fix: validate the correct role tag of a node.
Mar 7, 2025
c1a1b44
Bug fix: correct node validation
Mar 7, 2025
6749859
Bug fix: Handles when node.status.condition is None.
Mar 7, 2025
dab9d18
Bug fix: Sets default status_conditions
Mar 7, 2025
e36fb96
Correct kwok_nodes parameter.
Mar 10, 2025
9469a44
fix node count error cloud:aks
vittoriasalim Mar 11, 2025
ed71b35
fix parameter issue
vittoriasalim Mar 11, 2025
fd354af
fixing parameter issue 2
vittoriasalim Mar 11, 2025
362f96c
fix parameter issue
vittoriasalim Mar 11, 2025
cd4da6c
add function to check why is it flagged
vittoriasalim Mar 11, 2025
8d29445
Combined commits: add job scheduling pipeline, add unit test, configu…
vittoriasalim Apr 29, 2025
52186e9
Merge main to vitto/kwok-cl2
vittoriasalim Apr 29, 2025
2f2eedc
Merge branch 'main' into vitto/kwok-cl2
vittoriasalim Apr 29, 2025
600809d
Correction to unit test, rename tuning set and cli.pym, rename scenar…
vittoriasalim May 1, 2025
c9f6667
Merge branch 'main' into vitto/kwok-cl2
vittoriasalim May 1, 2025
799638d
Correction on unit test
vittoriasalim May 1, 2025
d5475a6
Correction: mktemp is deprecated, convert to mkstemp
vittoriasalim May 1, 2025
a31c61c
Update pipelines/perf-eval/Controller/job-scheduling.yml
vittoriasalim May 1, 2025
091a8ff
Merge branch 'main' into vitto/kwok-cl2
vittoriasalim May 1, 2025
6595878
Update scenarios/perf-eval/job-scheduling/config/job_template.yaml
vittoriasalim May 2, 2025
0274950
Update pipelines/perf-eval/Controller/job-scheduling.yml
vittoriasalim May 2, 2025
12e859a
Yaml file corrections
vittoriasalim May 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .yamllint
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# Default configuration: https://yamllint.readthedocs.io/en/stable/configuration.html#default-configuration

extends: default

rules:
document-start: disable
document-end: disable
Expand All @@ -20,3 +19,4 @@ rules:
ignore: |
modules/python/clusterloader2/**/*.yaml
modules/python/clusterloader2/**/*.yml
scenarios/perf-eval/job-scheduling/**/*.yaml
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those files should be moved to clusterloader2 instead of adding ignore here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the thought process. Previously cl2 configs are under modules folder. The problem with that is, to write a new config for a new pipeline, we need to copy/past a whole folder of code. This not only include the config files, but also the python files as well. This is very poor in terms of usability. So we propose a new solution, moving config files under scenarios, and created a default python module, that can be reused. Then creating a new pipeline doesn't require to copy the python files anymore.

21 changes: 11 additions & 10 deletions modules/python/clients/kubernetes_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
# https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/#taint-based-evictions
# https://kubernetes.io/docs/reference/labels-annotations-taints/
builtin_taints_keys = [
"node.kubernetes.io/not-ready",
"node.kubernetes.io/unreachable",
"node.kubernetes.io/pid-pressure",
"node.kubernetes.io/out-of-disk",
"node.kubernetes.io/memory-pressure",
"node.kubernetes.io/disk-pressure",
"node.kubernetes.io/network-unavailable",
"node.kubernetes.io/unschedulable",
"node.cloudprovider.kubernetes.io/uninitialized",
"node.cloudprovider.kubernetes.io/shutdown",
"node.kubernetes.io/not-ready",
"node.kubernetes.io/unreachable",
"node.kubernetes.io/pid-pressure",
"node.kubernetes.io/out-of-disk",
"node.kubernetes.io/memory-pressure",
"node.kubernetes.io/disk-pressure",
"node.kubernetes.io/network-unavailable",
"node.kubernetes.io/unschedulable",
"node.cloudprovider.kubernetes.io/uninitialized",
"node.cloudprovider.kubernetes.io/shutdown",
"kwok.x-k8s.io/kwok",
]

class KubernetesClient:
Expand Down
394 changes: 394 additions & 0 deletions modules/python/clusterloader2/default/cli.py

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions modules/python/clusterloader2/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
NETWORK_METRIC_PREFIXES = ["APIResponsivenessPrometheus", "InClusterNetworkLatency", "NetworkProgrammingLatency"]
PROM_QUERY_PREFIX = "GenericPrometheusQuery"
RESOURCE_USAGE_SUMMARY_PREFIX = "ResourceUsageSummary"
JOB_LIFECYCLE_LATENCY_PREFIX = {"JobLifecycleLatency_JobLifecycleLatency_":"JobLifecycleLatency_JobLifecycleLatency"}
NETWORK_POLICY_SOAK_MEASUREMENT_PREFIX = "NetworkPolicySoakMeasurement"

def run_cl2_command(kubeconfig, cl2_image, cl2_config_dir, cl2_report_dir, provider, cl2_config_file="config.yaml", overrides=False, enable_prometheus=False, tear_down_prometheus=True,
Expand Down Expand Up @@ -70,6 +71,10 @@ def get_measurement(file_path):
if file_name.startswith(RESOURCE_USAGE_SUMMARY_PREFIX):
group_name = file_name.split("_")[1]
return RESOURCE_USAGE_SUMMARY_PREFIX, group_name
for file_prefix, measurement in JOB_LIFECYCLE_LATENCY_PREFIX.items():
if file_name.startswith(file_prefix):
group_name = file_name.split("_")[2]
return measurement,group_name
if file_name.startswith(NETWORK_POLICY_SOAK_MEASUREMENT_PREFIX):
group_name = file_name.split("_")[1]
return NETWORK_POLICY_SOAK_MEASUREMENT_PREFIX, group_name
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"data": {
"Perc50": 78000,
"Perc90": 141000,
"Perc99": 155000
},
"unit": "ms",
"labels": {
"Metric": "create_to_start"
}
}
7 changes: 7 additions & 0 deletions modules/python/tests/mock_data/default/report/junit.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<testsuites>
<testsuite name="ClusterLoaderV2" tests="1" failures="0" errors="0" time="0.123">
<testcase name="JobLifecycleLatency" classname="JobScheduling" time="0.123">
</testcase>
</testsuite>
</testsuites>
183 changes: 183 additions & 0 deletions modules/python/tests/test_default_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import json
import os
import tempfile
import unittest
from unittest.mock import patch

from clusterloader2.default.cli import (
collect_clusterloader2,
configure_clusterloader2,
validate_clusterloader2,
)


class TestConfigureClusterLoader2(unittest.TestCase):
def test_configure_clusterloader2(self):
# Create a temporary file for the override file
fd, tmp_path = tempfile.mkstemp()

try:
# Call the function with test data
configure_clusterloader2(
cpu_per_node=2,
node_count=100,
node_per_step=10,
max_pods=40,
repeats=1,
operation_timeout="15m",
provider="azure",
cilium_enabled=False,
scrape_containerd=False,
service_test=True,
cnp_test=False,
ccnp_test=False,
num_cnps=0,
num_ccnps=0,
dualstack=False,
cl2_override_file=tmp_path,
workload_type="job",
job_count=1000,
job_parallelism=1,
job_completions=1,
job_throughput=1000,
)

# Verify the content of the override file
with open(tmp_path, "r", encoding="utf-8") as f:
content = f.read()

# Assert each key-value pair
self.assertIn("CL2_NODES: 100", content)
self.assertIn("CL2_NODES_PER_STEP: 10", content)
self.assertIn("CL2_OPERATION_TIMEOUT: 15m", content)
self.assertIn("CL2_REPEATS: 1", content)
self.assertIn("CL2_STEPS: 10", content)
self.assertIn("CL2_JOBS: 1000", content)
self.assertIn("CL2_JOB_PARALLELISM: 1", content)
self.assertIn("CL2_JOB_COMPLETIONS: 1", content)
self.assertIn("CL2_LOAD_TEST_THROUGHPUT: 1000", content)
self.assertIn("CL2_SERVICE_TEST: true", content)
finally:
os.close(fd)


class TestValidateClusterLoader2(unittest.TestCase):

@patch("clients.kubernetes_client.config.load_kube_config")
@patch("clients.kubernetes_client.KubernetesClient.get_ready_nodes")
def test_validate_clusterloader2_timeout(
self, mock_get_ready_nodes, mock_load_kube_config
):

# kubeconfig is not needed for this test but it has to be loaded to run KubernetesClient
mock_load_kube_config.return_value = None
# Mock the KubernetesClient and its get_ready_nodes method
mock_get_ready_nodes.return_value = ["node1"] # Only 1 node ready

# Call the function and expect an exception due to timeout
with self.assertRaises(Exception) as context:
validate_clusterloader2(node_count=2, operation_timeout_in_minutes=1)

# Verify the exception message
self.assertIn(
"Only 1 nodes are ready, expected 2 nodes!", str(context.exception)
)

@patch("clients.kubernetes_client.config.load_kube_config")
@patch("clients.kubernetes_client.KubernetesClient.get_ready_nodes")
def test_validate_clusterloader2_success(
self, mock_get_ready_nodes, mock_load_kube_config
):
mock_load_kube_config.return_value = None
# Mock the KubernetesClient and its get_ready_nodes method
mock_get_ready_nodes.side_effect = [
["node1"], # First call: 1 node ready
["node1", "node2"], # Second call: 2 nodes ready
]

# Call the function with test data
try:
validate_clusterloader2(node_count=2, operation_timeout_in_minutes=1)
except Exception as e:
self.fail(f"validate_clusterloader2 raised an exception unexpectedly: {e}")

# Verify that get_ready_nodes was at least 2 calls
# The first call should return 1 node, and the second call should return 2 nodes
self.assertGreaterEqual(mock_get_ready_nodes.call_count, 2)


class TestCollectClusterLoader2(unittest.TestCase):
def test_collect_clusterloader2(self):
# Create a temporary directory for the report
cl2_report_dir = os.path.join(
os.path.dirname(__file__), "mock_data", "default", "report"
)
# Create a temporary file for result output
fd, result_file = tempfile.mkstemp()

try:
# Call the function with test data
collect_clusterloader2(
cpu_per_node=2,
node_count=100,
max_pods=40,
repeats=1,
cl2_report_dir=cl2_report_dir,
cloud_info=json.dumps({"cloud": "aws"}),
run_id="run123",
run_url="http://example.com/run123",
service_test=True,
cnp_test=False,
ccnp_test=False,
result_file=result_file,
test_type="unit-test",
start_timestamp=None,
workload_type="pod",
job_count=None,
job_parallelism=None,
job_completions=None,
job_throughput=None,
)

# Verify the content of the result file
if os.path.exists(result_file):
with open(result_file, "r", encoding="utf-8") as f:
content = f.read()

# Parse the content as JSON
result_data = json.loads(content)

# Assert each key-value pair
self.assertEqual(result_data["node_count"], 100)
self.assertEqual(result_data["churn_rate"], 1)
self.assertEqual(result_data["status"], "success")
self.assertEqual(result_data["group"], "job-scheduling")
self.assertEqual(
result_data["measurement"],
"JobLifecycleLatency_JobLifecycleLatency",
)

# Assert nested result data
self.assertEqual(result_data["result"]["data"]["Perc50"], 78000)
self.assertEqual(result_data["result"]["data"]["Perc90"], 141000)
self.assertEqual(result_data["result"]["data"]["Perc99"], 155000)
self.assertEqual(result_data["result"]["unit"], "ms")
self.assertEqual(
result_data["result"]["labels"]["Metric"], "create_to_start"
)

# Assert other fields
self.assertEqual(result_data["cloud_info"], '{"cloud": "aws"}')
self.assertEqual(result_data["run_id"], "run123")
self.assertEqual(result_data["run_url"], "http://example.com/run123")
self.assertEqual(result_data["test_type"], "unit-test")
self.assertEqual(result_data["cpu_per_node"], 2)
self.assertEqual(result_data["pod_count"], 4000)
else:
self.fail("Result file does not exist or is empty.")
finally:
os.close(fd)


if __name__ == "__main__":
unittest.main()
66 changes: 66 additions & 0 deletions pipelines/perf-eval/Controller/job-scheduling.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
trigger: none
schedules:
- cron: "30 1 */2 * *" # Every 2 days at 1:30 AM
displayName: "1:30 AM every 2 days"
branches:
include:
- main
- vitto/kwok-cl2 # to be removed after the PR is merged
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private branch scheduled should be removed before the merge. If not, you have to create new PR again to remove this branch.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest leaving this comment unresolved till you resolve all other comments and remove this change after you get an approval for this PR.

always: true
variables:
SCENARIO_TYPE: perf-eval
SCENARIO_NAME: job-scheduling
SCENARIO_VERSION: main
stages:
- stage: azure_eastus2
dependsOn: []
jobs:
- template: /jobs/competitive-test.yml
parameters:
cloud: azure
regions:
- eastus2
engine: clusterloader2
engine_input:
image: "ghcr.io/azure/clusterloader2:v20241022"
topology: kwok
matrix:
default:
node_count: 2000 # 2k kwok nodes
job_throughput: 800 # qps
job_count: 20000
cilium_enabled: False
scale_timeout: "1h"
service_test: False
workload_type: "job"
cl2_config_file: config.yaml
max_parallel: 1
timeout_in_minutes: 360
credential_type: service_connection
ssh_key_enabled: false
- stage: aws_eastus2
dependsOn: []
jobs:
- template: /jobs/competitive-test.yml
parameters:
cloud: aws
regions:
- us-east-2
engine: clusterloader2
engine_input:
image: "ghcr.io/azure/clusterloader2:v20241022"
topology: kwok
matrix:
default:
node_count: 2000
job_throughput: 800
job_count: 20000
cilium_enabled: False
scale_timeout: "1h"
service_test: False
workload_type: "job"
cl2_config_file: config.yaml
max_parallel: 1
timeout_in_minutes: 360
credential_type: service_connection
ssh_key_enabled: false
62 changes: 62 additions & 0 deletions scenarios/perf-eval/job-scheduling/config/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
name: job-scheduling
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be moved to clusterloader2 folder for re-use


{{$job_count := DefaultParam .CL2_JOBS 20000}}
{{$qps := DefaultParam .CL2_LOAD_TEST_THROUGHPUT 800}}

namespace:
number: 1
prefix: job-scheduling
deleteStaleNamespaces: true
deleteAutomanagedNamespaces: true
enableExistingNamespaces: false

tuningSets:
- name: Uniform{{$qps}}qps
qpsLoad:
qps: {{$qps}}

steps:
- name: Start measurements
measurements:
- Identifier: JobLifecycleLatency
Method: JobLifecycleLatency
Params:
action: start
labelSelector: group=job-scheduling
timeout: 3h
- Identifier: WaitForFinishedJobs
Method: WaitForFinishedJobs
Params:
action: start
labelSelector: group=job-scheduling
timeout: 3h

{{range $i := Loop $job_count}}
- name: Create job {{$i}}
phases:
- namespaceRange:
min: 1
max: 1
replicasPerNamespace: 1
tuningSet: Uniform{{$qps}}qps
objectBundle:
- basename: test-job-{{$i}}
objectTemplatePath: job_template.yaml
templateFillMap:
Group: job-scheduling
{{end}}

- name: Waiting for jobs to be finished
measurements:
- Identifier: WaitForFinishedJobs
Method: WaitForFinishedJobs
Params:
action: gather
timeout: 3h
- name: Collect measurements
measurements:
- Identifier: JobLifecycleLatency
Method: JobLifecycleLatency
Params:
action: gather
timeout: 3h
Loading