diff --git a/.github/workflows/e2e_tests.yaml b/.github/workflows/e2e_tests.yaml index 7c4612822..fea42ab6b 100644 --- a/.github/workflows/e2e_tests.yaml +++ b/.github/workflows/e2e_tests.yaml @@ -64,6 +64,8 @@ jobs: - name: Setup and start KinD cluster uses: ./common/github-actions/kind + with: + worker-nodes: 1 - name: Install NVidia GPU operator for KinD uses: ./common/github-actions/nvidia-gpu-operator @@ -102,6 +104,8 @@ jobs: kubectl create clusterrolebinding sdk-user-localqueue-creator --clusterrole=localqueue-creator --user=sdk-user kubectl create clusterrole list-secrets --verb=get,list --resource=secrets kubectl create clusterrolebinding sdk-user-list-secrets --clusterrole=list-secrets --user=sdk-user + kubectl create clusterrole pod-creator --verb=get,list --resource=pods + kubectl create clusterrolebinding sdk-user-pod-creator --clusterrole=pod-creator --user=sdk-user kubectl config use-context sdk-user - name: Run e2e tests diff --git a/tests/e2e/heterogeneous_clusters_kind_test.py b/tests/e2e/heterogeneous_clusters_kind_test.py new file mode 100644 index 000000000..8f814a7c4 --- /dev/null +++ b/tests/e2e/heterogeneous_clusters_kind_test.py @@ -0,0 +1,74 @@ +from time import sleep +import time +from codeflare_sdk import ( + Cluster, + ClusterConfiguration, +) + +from codeflare_sdk.common.kueue.kueue import list_local_queues + +import pytest + +from support import * + + +@pytest.mark.skip(reason="Skipping heterogenous cluster kind test") +@pytest.mark.kind +class TestHeterogeneousClustersKind: + def setup_method(self): + initialize_kubernetes_client(self) + + def teardown_method(self): + delete_namespace(self) + delete_kueue_resources(self) + + @pytest.mark.nvidia_gpu + def test_heterogeneous_clusters(self): + create_namespace(self) + create_kueue_resources(self, 2) + self.run_heterogeneous_clusters() + + def run_heterogeneous_clusters( + self, gpu_resource_name="nvidia.com/gpu", number_of_gpus=0 + ): + for flavor in self.resource_flavors: + node_labels = ( + get_flavor_spec(self, flavor).get("spec", {}).get("nodeLabels", {}) + ) + expected_nodes = get_nodes_by_label(self, node_labels) + + print(f"Expected nodes: {expected_nodes}") + cluster_name = f"test-ray-cluster-li-{flavor[-5:]}" + queues = list_local_queues(namespace=self.namespace, flavors=[flavor]) + queue_name = queues[0]["name"] if queues else None + print(f"Using flavor: {flavor}, Queue: {queue_name}") + cluster = Cluster( + ClusterConfiguration( + name=cluster_name, + namespace=self.namespace, + num_workers=1, + head_cpu_requests="500m", + head_cpu_limits="500m", + head_memory_requests=2, + head_memory_limits=2, + worker_cpu_requests="500m", + worker_cpu_limits=1, + worker_memory_requests=1, + worker_memory_limits=4, + worker_extended_resource_requests={ + gpu_resource_name: number_of_gpus + }, + write_to_file=True, + verify_tls=False, + local_queue=queue_name, + ) + ) + cluster.up() + sleep(5) + node_name = get_pod_node(self, self.namespace, cluster_name) + print(f"Cluster {cluster_name}-{flavor} is running on node: {node_name}") + sleep(5) + assert ( + node_name in expected_nodes + ), f"Node {node_name} is not in the expected nodes for flavor {flavor}." + cluster.down() diff --git a/tests/e2e/heterogeneous_clusters_oauth_test.py b/tests/e2e/heterogeneous_clusters_oauth_test.py new file mode 100644 index 000000000..4a7a687cf --- /dev/null +++ b/tests/e2e/heterogeneous_clusters_oauth_test.py @@ -0,0 +1,77 @@ +from time import sleep +import time +from codeflare_sdk import ( + Cluster, + ClusterConfiguration, + TokenAuthentication, +) + +from codeflare_sdk.common.kueue.kueue import list_local_queues + +import pytest + +from support import * + + +@pytest.mark.openshift +class TestHeterogeneousClustersOauth: + def setup_method(self): + initialize_kubernetes_client(self) + + def teardown_method(self): + delete_namespace(self) + delete_kueue_resources(self) + + def test_heterogeneous_clusters(self): + create_namespace(self) + create_kueue_resources(self, 2) + self.run_heterogeneous_clusters() + + def run_heterogeneous_clusters( + self, gpu_resource_name="nvidia.com/gpu", number_of_gpus=0 + ): + ray_image = get_ray_image() + + auth = TokenAuthentication( + token=run_oc_command(["whoami", "--show-token=true"]), + server=run_oc_command(["whoami", "--show-server=true"]), + skip_tls=True, + ) + auth.login() + + for flavor in self.resource_flavors: + node_labels = ( + get_flavor_spec(self, flavor).get("spec", {}).get("nodeLabels", {}) + ) + expected_nodes = get_nodes_by_label(self, node_labels) + + print(f"Expected nodes: {expected_nodes}") + cluster_name = f"test-ray-cluster-li-{flavor[-5:]}" + queues = list_local_queues(namespace=self.namespace, flavors=[flavor]) + queue_name = queues[0]["name"] if queues else None + print(f"Using flavor: {flavor}, Queue: {queue_name}") + cluster = Cluster( + ClusterConfiguration( + namespace=self.namespace, + name=cluster_name, + num_workers=1, + head_cpu_requests="500m", + head_cpu_limits="500m", + worker_cpu_requests="500m", + worker_cpu_limits=1, + worker_memory_requests=1, + worker_memory_limits=4, + image=ray_image, + verify_tls=False, + local_queue=queue_name, + ) + ) + cluster.up() + sleep(5) + node_name = get_pod_node(self, self.namespace, cluster_name) + print(f"Cluster {cluster_name}-{flavor} is running on node: {node_name}") + sleep(5) + assert ( + node_name in expected_nodes + ), f"Node {node_name} is not in the expected nodes for flavor {flavor}." + cluster.down() diff --git a/tests/e2e/support.py b/tests/e2e/support.py index 6fdd03af9..604884668 100644 --- a/tests/e2e/support.py +++ b/tests/e2e/support.py @@ -1,3 +1,4 @@ +import json import os import random import string @@ -65,19 +66,30 @@ def create_namespace(self): return RuntimeError(e) -def create_new_resource_flavor(self): - self.resource_flavor = f"test-resource-flavor-{random_choice()}" - create_resource_flavor(self, self.resource_flavor) +def create_new_resource_flavor(self, num_flavors): + self.resource_flavors = [] + for i in range(num_flavors): + default = i < 1 + resource_flavor = f"test-resource-flavor-{random_choice()}" + create_resource_flavor(self, resource_flavor, default) + self.resource_flavors.append(resource_flavor) -def create_new_cluster_queue(self): - self.cluster_queue = f"test-cluster-queue-{random_choice()}" - create_cluster_queue(self, self.cluster_queue, self.resource_flavor) +def create_new_cluster_queue(self, num_queues): + self.cluster_queues = [] + for i in range(num_queues): + cluster_queue_name = f"test-cluster-queue-{random_choice()}" + create_cluster_queue(self, cluster_queue_name, self.resource_flavors[i]) + self.cluster_queues.append(cluster_queue_name) -def create_new_local_queue(self): - self.local_queue = f"test-local-queue-{random_choice()}" - create_local_queue(self, self.cluster_queue, self.local_queue) +def create_new_local_queue(self, num_queues): + self.local_queues = [] + for i in range(num_queues): + is_default = i == 0 + local_queue_name = f"test-local-queue-{random_choice()}" + create_local_queue(self, self.cluster_queues[i], local_queue_name, is_default) + self.local_queues.append(local_queue_name) def create_namespace_with_name(self, namespace_name): @@ -132,7 +144,7 @@ def create_cluster_queue(self, cluster_queue, flavor): {"name": "memory", "nominalQuota": "36Gi"}, {"name": "nvidia.com/gpu", "nominalQuota": 1}, ], - } + }, ], } ], @@ -161,11 +173,33 @@ def create_cluster_queue(self, cluster_queue, flavor): self.cluster_queue = cluster_queue -def create_resource_flavor(self, flavor): +def create_resource_flavor(self, flavor, default=True): + worker_label, worker_value = os.getenv("WORKER_LABEL", "worker-1=true").split("=") + control_label, control_value = os.getenv( + "CONTROL_LABEL", "ingress-ready=true" + ).split("=") + toleration_key = os.getenv( + "TOLERATION_KEY", "node-role.kubernetes.io/control-plane" + ) + + node_labels = ( + {worker_label: worker_value} if default else {control_label: control_value} + ) + resource_flavor_json = { "apiVersion": "kueue.x-k8s.io/v1beta1", "kind": "ResourceFlavor", "metadata": {"name": flavor}, + "spec": { + "nodeLabels": node_labels, + "tolerations": [ + { + "key": toleration_key, + "operator": "Exists", + "effect": "NoSchedule", + } + ], + }, } try: @@ -190,14 +224,14 @@ def create_resource_flavor(self, flavor): self.resource_flavor = flavor -def create_local_queue(self, cluster_queue, local_queue): +def create_local_queue(self, cluster_queue, local_queue, is_default=True): local_queue_json = { "apiVersion": "kueue.x-k8s.io/v1beta1", "kind": "LocalQueue", "metadata": { "namespace": self.namespace, "name": local_queue, - "annotations": {"kueue.x-k8s.io/default-queue": "true"}, + "annotations": {"kueue.x-k8s.io/default-queue": str(is_default).lower()}, }, "spec": {"clusterQueue": cluster_queue}, } @@ -226,34 +260,77 @@ def create_local_queue(self, cluster_queue, local_queue): self.local_queue = local_queue -def create_kueue_resources(self): +def create_kueue_resources(self, resource_ammount=1): print("creating Kueue resources ...") - create_new_resource_flavor(self) - create_new_cluster_queue(self) - create_new_local_queue(self) + create_new_resource_flavor(self, resource_ammount) + create_new_cluster_queue(self, resource_ammount) + create_new_local_queue(self, resource_ammount) def delete_kueue_resources(self): # Delete if given cluster-queue exists - try: - self.custom_api.delete_cluster_custom_object( - group="kueue.x-k8s.io", - plural="clusterqueues", - version="v1beta1", - name=self.cluster_queue, - ) - print(f"\n'{self.cluster_queue}' cluster-queue deleted") - except Exception as e: - print(f"\nError deleting cluster-queue '{self.cluster_queue}' : {e}") + for cq in self.cluster_queues: + try: + self.custom_api.delete_cluster_custom_object( + group="kueue.x-k8s.io", + plural="clusterqueues", + version="v1beta1", + name=cq, + ) + print(f"\n'{cq}' cluster-queue deleted") + except Exception as e: + print(f"\nError deleting cluster-queue '{cq}' : {e}") # Delete if given resource-flavor exists + for flavor in self.resource_flavors: + try: + self.custom_api.delete_cluster_custom_object( + group="kueue.x-k8s.io", + plural="resourceflavors", + version="v1beta1", + name=flavor, + ) + print(f"'{flavor}' resource-flavor deleted") + except Exception as e: + print(f"\nError deleting resource-flavor '{flavor}': {e}") + + +def get_pod_node(self, namespace, name): + label_selector = f"ray.io/cluster={name}" + pods = self.api_instance.list_namespaced_pod( + namespace, label_selector=label_selector + ) + if not pods.items: + raise ValueError( + f"Unable to retrieve node name for pod '{name}' in namespace '{namespace}'" + ) + pod = pods.items[0] + node_name = pod.spec.node_name + if node_name is None: + raise ValueError( + f"No node selected for pod '{name}' in namespace '{namespace}'" + ) + return node_name + + +def get_flavor_spec(self, flavor_name): try: - self.custom_api.delete_cluster_custom_object( + flavor = self.custom_api.get_cluster_custom_object( group="kueue.x-k8s.io", - plural="resourceflavors", version="v1beta1", - name=self.resource_flavor, + plural="resourceflavors", + name=flavor_name, ) - print(f"'{self.resource_flavor}' resource-flavor deleted") - except Exception as e: - print(f"\nError deleting resource-flavor '{self.resource_flavor}' : {e}") + return flavor + except client.exceptions.ApiException as e: + if e.status == 404: + print(f"ResourceFlavor '{flavor_name}' not found.") + else: + print(f"Error retrieving ResourceFlavor '{flavor_name}': {e}") + raise + + +def get_nodes_by_label(self, node_labels): + label_selector = ",".join(f"{k}={v}" for k, v in node_labels.items()) + nodes = self.api_instance.list_node(label_selector=label_selector) + return [node.metadata.name for node in nodes.items]