Skip to content

Commit

Permalink
e2e test for heterogenous cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
Ygnas committed Oct 29, 2024
1 parent 26d24f2 commit 01e5964
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 33 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/e2e_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ jobs:

- name: Setup and start KinD cluster
uses: ./common/github-actions/kind
with:
worker-nodes: 1

- name: Install NVidia GPU operator for KinD
uses: ./common/github-actions/nvidia-gpu-operator
Expand Down Expand Up @@ -102,6 +104,8 @@ jobs:
kubectl create clusterrolebinding sdk-user-localqueue-creator --clusterrole=localqueue-creator --user=sdk-user
kubectl create clusterrole list-secrets --verb=get,list --resource=secrets
kubectl create clusterrolebinding sdk-user-list-secrets --clusterrole=list-secrets --user=sdk-user
kubectl create clusterrole pod-creator --verb=get,list --resource=pods
kubectl create clusterrolebinding sdk-user-pod-creator --clusterrole=pod-creator --user=sdk-user
kubectl config use-context sdk-user
- name: Run e2e tests
Expand Down
74 changes: 74 additions & 0 deletions tests/e2e/heterogeneous_clusters_kind_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from time import sleep
import time
from codeflare_sdk import (
Cluster,
ClusterConfiguration,
)

from codeflare_sdk.common.kueue.kueue import list_local_queues

import pytest

from support import *


@pytest.mark.skip(reason="Skipping heterogenous cluster kind test")
@pytest.mark.kind
class TestHeterogeneousClustersKind:
def setup_method(self):
initialize_kubernetes_client(self)

def teardown_method(self):
delete_namespace(self)
delete_kueue_resources(self)

@pytest.mark.nvidia_gpu
def test_heterogeneous_clusters(self):
create_namespace(self)
create_kueue_resources(self, 2)
self.run_heterogeneous_clusters()

def run_heterogeneous_clusters(
self, gpu_resource_name="nvidia.com/gpu", number_of_gpus=0
):
for flavor in self.resource_flavors:
node_labels = (
get_flavor_spec(self, flavor).get("spec", {}).get("nodeLabels", {})
)
expected_nodes = get_nodes_by_label(self, node_labels)

print(f"Expected nodes: {expected_nodes}")
cluster_name = f"test-ray-cluster-li-{flavor[-5:]}"
queues = list_local_queues(namespace=self.namespace, flavors=[flavor])
queue_name = queues[0]["name"] if queues else None
print(f"Using flavor: {flavor}, Queue: {queue_name}")
cluster = Cluster(
ClusterConfiguration(
name=cluster_name,
namespace=self.namespace,
num_workers=1,
head_cpu_requests="500m",
head_cpu_limits="500m",
head_memory_requests=2,
head_memory_limits=2,
worker_cpu_requests="500m",
worker_cpu_limits=1,
worker_memory_requests=1,
worker_memory_limits=4,
worker_extended_resource_requests={
gpu_resource_name: number_of_gpus
},
write_to_file=True,
verify_tls=False,
local_queue=queue_name,
)
)
cluster.up()
sleep(5)
node_name = get_pod_node(self, self.namespace, cluster_name)
print(f"Cluster {cluster_name}-{flavor} is running on node: {node_name}")
sleep(5)
assert (
node_name in expected_nodes
), f"Node {node_name} is not in the expected nodes for flavor {flavor}."
cluster.down()
77 changes: 77 additions & 0 deletions tests/e2e/heterogeneous_clusters_oauth_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from time import sleep
import time
from codeflare_sdk import (
Cluster,
ClusterConfiguration,
TokenAuthentication,
)

from codeflare_sdk.common.kueue.kueue import list_local_queues

import pytest

from support import *


@pytest.mark.openshift
class TestHeterogeneousClustersOauth:
def setup_method(self):
initialize_kubernetes_client(self)

def teardown_method(self):
delete_namespace(self)
delete_kueue_resources(self)

def test_heterogeneous_clusters(self):
create_namespace(self)
create_kueue_resources(self, 2)
self.run_heterogeneous_clusters()

def run_heterogeneous_clusters(
self, gpu_resource_name="nvidia.com/gpu", number_of_gpus=0
):
ray_image = get_ray_image()

auth = TokenAuthentication(
token=run_oc_command(["whoami", "--show-token=true"]),
server=run_oc_command(["whoami", "--show-server=true"]),
skip_tls=True,
)
auth.login()

for flavor in self.resource_flavors:
node_labels = (
get_flavor_spec(self, flavor).get("spec", {}).get("nodeLabels", {})
)
expected_nodes = get_nodes_by_label(self, node_labels)

print(f"Expected nodes: {expected_nodes}")
cluster_name = f"test-ray-cluster-li-{flavor[-5:]}"
queues = list_local_queues(namespace=self.namespace, flavors=[flavor])
queue_name = queues[0]["name"] if queues else None
print(f"Using flavor: {flavor}, Queue: {queue_name}")
cluster = Cluster(
ClusterConfiguration(
namespace=self.namespace,
name=cluster_name,
num_workers=1,
head_cpu_requests="500m",
head_cpu_limits="500m",
worker_cpu_requests="500m",
worker_cpu_limits=1,
worker_memory_requests=1,
worker_memory_limits=4,
image=ray_image,
verify_tls=False,
local_queue=queue_name,
)
)
cluster.up()
sleep(5)
node_name = get_pod_node(self, self.namespace, cluster_name)
print(f"Cluster {cluster_name}-{flavor} is running on node: {node_name}")
sleep(5)
assert (
node_name in expected_nodes
), f"Node {node_name} is not in the expected nodes for flavor {flavor}."
cluster.down()
143 changes: 110 additions & 33 deletions tests/e2e/support.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import os
import random
import string
Expand Down Expand Up @@ -65,19 +66,30 @@ def create_namespace(self):
return RuntimeError(e)


def create_new_resource_flavor(self):
self.resource_flavor = f"test-resource-flavor-{random_choice()}"
create_resource_flavor(self, self.resource_flavor)
def create_new_resource_flavor(self, num_flavors):
self.resource_flavors = []
for i in range(num_flavors):
default = i < 1
resource_flavor = f"test-resource-flavor-{random_choice()}"
create_resource_flavor(self, resource_flavor, default)
self.resource_flavors.append(resource_flavor)


def create_new_cluster_queue(self):
self.cluster_queue = f"test-cluster-queue-{random_choice()}"
create_cluster_queue(self, self.cluster_queue, self.resource_flavor)
def create_new_cluster_queue(self, num_queues):
self.cluster_queues = []
for i in range(num_queues):
cluster_queue_name = f"test-cluster-queue-{random_choice()}"
create_cluster_queue(self, cluster_queue_name, self.resource_flavors[i])
self.cluster_queues.append(cluster_queue_name)


def create_new_local_queue(self):
self.local_queue = f"test-local-queue-{random_choice()}"
create_local_queue(self, self.cluster_queue, self.local_queue)
def create_new_local_queue(self, num_queues):
self.local_queues = []
for i in range(num_queues):
is_default = i == 0
local_queue_name = f"test-local-queue-{random_choice()}"
create_local_queue(self, self.cluster_queues[i], local_queue_name, is_default)
self.local_queues.append(local_queue_name)


def create_namespace_with_name(self, namespace_name):
Expand Down Expand Up @@ -132,7 +144,7 @@ def create_cluster_queue(self, cluster_queue, flavor):
{"name": "memory", "nominalQuota": "36Gi"},
{"name": "nvidia.com/gpu", "nominalQuota": 1},
],
}
},
],
}
],
Expand Down Expand Up @@ -161,11 +173,33 @@ def create_cluster_queue(self, cluster_queue, flavor):
self.cluster_queue = cluster_queue


def create_resource_flavor(self, flavor):
def create_resource_flavor(self, flavor, default=True):
worker_label, worker_value = os.getenv("WORKER_LABEL", "worker-1=true").split("=")
control_label, control_value = os.getenv(
"CONTROL_LABEL", "ingress-ready=true"
).split("=")
toleration_key = os.getenv(
"TOLERATION_KEY", "node-role.kubernetes.io/control-plane"
)

node_labels = (
{worker_label: worker_value} if default else {control_label: control_value}
)

resource_flavor_json = {
"apiVersion": "kueue.x-k8s.io/v1beta1",
"kind": "ResourceFlavor",
"metadata": {"name": flavor},
"spec": {
"nodeLabels": node_labels,
"tolerations": [
{
"key": toleration_key,
"operator": "Exists",
"effect": "NoSchedule",
}
],
},
}

try:
Expand All @@ -190,14 +224,14 @@ def create_resource_flavor(self, flavor):
self.resource_flavor = flavor


def create_local_queue(self, cluster_queue, local_queue):
def create_local_queue(self, cluster_queue, local_queue, is_default=True):
local_queue_json = {
"apiVersion": "kueue.x-k8s.io/v1beta1",
"kind": "LocalQueue",
"metadata": {
"namespace": self.namespace,
"name": local_queue,
"annotations": {"kueue.x-k8s.io/default-queue": "true"},
"annotations": {"kueue.x-k8s.io/default-queue": str(is_default).lower()},
},
"spec": {"clusterQueue": cluster_queue},
}
Expand Down Expand Up @@ -226,34 +260,77 @@ def create_local_queue(self, cluster_queue, local_queue):
self.local_queue = local_queue


def create_kueue_resources(self):
def create_kueue_resources(self, resource_ammount=1):
print("creating Kueue resources ...")
create_new_resource_flavor(self)
create_new_cluster_queue(self)
create_new_local_queue(self)
create_new_resource_flavor(self, resource_ammount)
create_new_cluster_queue(self, resource_ammount)
create_new_local_queue(self, resource_ammount)


def delete_kueue_resources(self):
# Delete if given cluster-queue exists
try:
self.custom_api.delete_cluster_custom_object(
group="kueue.x-k8s.io",
plural="clusterqueues",
version="v1beta1",
name=self.cluster_queue,
)
print(f"\n'{self.cluster_queue}' cluster-queue deleted")
except Exception as e:
print(f"\nError deleting cluster-queue '{self.cluster_queue}' : {e}")
for cq in self.cluster_queues:
try:
self.custom_api.delete_cluster_custom_object(
group="kueue.x-k8s.io",
plural="clusterqueues",
version="v1beta1",
name=cq,
)
print(f"\n'{cq}' cluster-queue deleted")
except Exception as e:
print(f"\nError deleting cluster-queue '{cq}' : {e}")

# Delete if given resource-flavor exists
for flavor in self.resource_flavors:
try:
self.custom_api.delete_cluster_custom_object(
group="kueue.x-k8s.io",
plural="resourceflavors",
version="v1beta1",
name=flavor,
)
print(f"'{flavor}' resource-flavor deleted")
except Exception as e:
print(f"\nError deleting resource-flavor '{flavor}': {e}")


def get_pod_node(self, namespace, name):
label_selector = f"ray.io/cluster={name}"
pods = self.api_instance.list_namespaced_pod(
namespace, label_selector=label_selector
)
if not pods.items:
raise ValueError(
f"Unable to retrieve node name for pod '{name}' in namespace '{namespace}'"
)
pod = pods.items[0]
node_name = pod.spec.node_name
if node_name is None:
raise ValueError(
f"No node selected for pod '{name}' in namespace '{namespace}'"
)
return node_name


def get_flavor_spec(self, flavor_name):
try:
self.custom_api.delete_cluster_custom_object(
flavor = self.custom_api.get_cluster_custom_object(
group="kueue.x-k8s.io",
plural="resourceflavors",
version="v1beta1",
name=self.resource_flavor,
plural="resourceflavors",
name=flavor_name,
)
print(f"'{self.resource_flavor}' resource-flavor deleted")
except Exception as e:
print(f"\nError deleting resource-flavor '{self.resource_flavor}' : {e}")
return flavor
except client.exceptions.ApiException as e:
if e.status == 404:
print(f"ResourceFlavor '{flavor_name}' not found.")
else:
print(f"Error retrieving ResourceFlavor '{flavor_name}': {e}")
raise


def get_nodes_by_label(self, node_labels):
label_selector = ",".join(f"{k}={v}" for k, v in node_labels.items())
nodes = self.api_instance.list_node(label_selector=label_selector)
return [node.metadata.name for node in nodes.items]

0 comments on commit 01e5964

Please sign in to comment.