From 01e59646d2215a2d0a417215b5f026997a707f01 Mon Sep 17 00:00:00 2001
From: Ignas Baranauskas <ibaranau@redhat.com>
Date: Thu, 17 Oct 2024 13:03:47 +0100
Subject: [PATCH] e2e test for heterogenous cluster

---
 .github/workflows/e2e_tests.yaml              |   4 +
 tests/e2e/heterogeneous_clusters_kind_test.py |  74 +++++++++
 .../e2e/heterogeneous_clusters_oauth_test.py  |  77 ++++++++++
 tests/e2e/support.py                          | 143 ++++++++++++++----
 4 files changed, 265 insertions(+), 33 deletions(-)
 create mode 100644 tests/e2e/heterogeneous_clusters_kind_test.py
 create mode 100644 tests/e2e/heterogeneous_clusters_oauth_test.py

diff --git a/.github/workflows/e2e_tests.yaml b/.github/workflows/e2e_tests.yaml
index 7c4612822..fea42ab6b 100644
--- a/.github/workflows/e2e_tests.yaml
+++ b/.github/workflows/e2e_tests.yaml
@@ -64,6 +64,8 @@ jobs:
 
       - name: Setup and start KinD cluster
         uses: ./common/github-actions/kind
+        with:
+          worker-nodes: 1
 
       - name: Install NVidia GPU operator for KinD
         uses: ./common/github-actions/nvidia-gpu-operator
@@ -102,6 +104,8 @@ jobs:
           kubectl create clusterrolebinding sdk-user-localqueue-creator --clusterrole=localqueue-creator --user=sdk-user
           kubectl create clusterrole list-secrets --verb=get,list --resource=secrets
           kubectl create clusterrolebinding sdk-user-list-secrets --clusterrole=list-secrets --user=sdk-user
+          kubectl create clusterrole pod-creator --verb=get,list --resource=pods
+          kubectl create clusterrolebinding sdk-user-pod-creator --clusterrole=pod-creator --user=sdk-user
           kubectl config use-context sdk-user
 
       - name: Run e2e tests
diff --git a/tests/e2e/heterogeneous_clusters_kind_test.py b/tests/e2e/heterogeneous_clusters_kind_test.py
new file mode 100644
index 000000000..8f814a7c4
--- /dev/null
+++ b/tests/e2e/heterogeneous_clusters_kind_test.py
@@ -0,0 +1,74 @@
+from time import sleep
+import time
+from codeflare_sdk import (
+    Cluster,
+    ClusterConfiguration,
+)
+
+from codeflare_sdk.common.kueue.kueue import list_local_queues
+
+import pytest
+
+from support import *
+
+
+@pytest.mark.skip(reason="Skipping heterogenous cluster kind test")
+@pytest.mark.kind
+class TestHeterogeneousClustersKind:
+    def setup_method(self):
+        initialize_kubernetes_client(self)
+
+    def teardown_method(self):
+        delete_namespace(self)
+        delete_kueue_resources(self)
+
+    @pytest.mark.nvidia_gpu
+    def test_heterogeneous_clusters(self):
+        create_namespace(self)
+        create_kueue_resources(self, 2)
+        self.run_heterogeneous_clusters()
+
+    def run_heterogeneous_clusters(
+        self, gpu_resource_name="nvidia.com/gpu", number_of_gpus=0
+    ):
+        for flavor in self.resource_flavors:
+            node_labels = (
+                get_flavor_spec(self, flavor).get("spec", {}).get("nodeLabels", {})
+            )
+            expected_nodes = get_nodes_by_label(self, node_labels)
+
+            print(f"Expected nodes: {expected_nodes}")
+            cluster_name = f"test-ray-cluster-li-{flavor[-5:]}"
+            queues = list_local_queues(namespace=self.namespace, flavors=[flavor])
+            queue_name = queues[0]["name"] if queues else None
+            print(f"Using flavor: {flavor}, Queue: {queue_name}")
+            cluster = Cluster(
+                ClusterConfiguration(
+                    name=cluster_name,
+                    namespace=self.namespace,
+                    num_workers=1,
+                    head_cpu_requests="500m",
+                    head_cpu_limits="500m",
+                    head_memory_requests=2,
+                    head_memory_limits=2,
+                    worker_cpu_requests="500m",
+                    worker_cpu_limits=1,
+                    worker_memory_requests=1,
+                    worker_memory_limits=4,
+                    worker_extended_resource_requests={
+                        gpu_resource_name: number_of_gpus
+                    },
+                    write_to_file=True,
+                    verify_tls=False,
+                    local_queue=queue_name,
+                )
+            )
+            cluster.up()
+            sleep(5)
+            node_name = get_pod_node(self, self.namespace, cluster_name)
+            print(f"Cluster {cluster_name}-{flavor} is running on node: {node_name}")
+            sleep(5)
+            assert (
+                node_name in expected_nodes
+            ), f"Node {node_name} is not in the expected nodes for flavor {flavor}."
+            cluster.down()
diff --git a/tests/e2e/heterogeneous_clusters_oauth_test.py b/tests/e2e/heterogeneous_clusters_oauth_test.py
new file mode 100644
index 000000000..4a7a687cf
--- /dev/null
+++ b/tests/e2e/heterogeneous_clusters_oauth_test.py
@@ -0,0 +1,77 @@
+from time import sleep
+import time
+from codeflare_sdk import (
+    Cluster,
+    ClusterConfiguration,
+    TokenAuthentication,
+)
+
+from codeflare_sdk.common.kueue.kueue import list_local_queues
+
+import pytest
+
+from support import *
+
+
+@pytest.mark.openshift
+class TestHeterogeneousClustersOauth:
+    def setup_method(self):
+        initialize_kubernetes_client(self)
+
+    def teardown_method(self):
+        delete_namespace(self)
+        delete_kueue_resources(self)
+
+    def test_heterogeneous_clusters(self):
+        create_namespace(self)
+        create_kueue_resources(self, 2)
+        self.run_heterogeneous_clusters()
+
+    def run_heterogeneous_clusters(
+        self, gpu_resource_name="nvidia.com/gpu", number_of_gpus=0
+    ):
+        ray_image = get_ray_image()
+
+        auth = TokenAuthentication(
+            token=run_oc_command(["whoami", "--show-token=true"]),
+            server=run_oc_command(["whoami", "--show-server=true"]),
+            skip_tls=True,
+        )
+        auth.login()
+
+        for flavor in self.resource_flavors:
+            node_labels = (
+                get_flavor_spec(self, flavor).get("spec", {}).get("nodeLabels", {})
+            )
+            expected_nodes = get_nodes_by_label(self, node_labels)
+
+            print(f"Expected nodes: {expected_nodes}")
+            cluster_name = f"test-ray-cluster-li-{flavor[-5:]}"
+            queues = list_local_queues(namespace=self.namespace, flavors=[flavor])
+            queue_name = queues[0]["name"] if queues else None
+            print(f"Using flavor: {flavor}, Queue: {queue_name}")
+            cluster = Cluster(
+                ClusterConfiguration(
+                    namespace=self.namespace,
+                    name=cluster_name,
+                    num_workers=1,
+                    head_cpu_requests="500m",
+                    head_cpu_limits="500m",
+                    worker_cpu_requests="500m",
+                    worker_cpu_limits=1,
+                    worker_memory_requests=1,
+                    worker_memory_limits=4,
+                    image=ray_image,
+                    verify_tls=False,
+                    local_queue=queue_name,
+                )
+            )
+            cluster.up()
+            sleep(5)
+            node_name = get_pod_node(self, self.namespace, cluster_name)
+            print(f"Cluster {cluster_name}-{flavor} is running on node: {node_name}")
+            sleep(5)
+            assert (
+                node_name in expected_nodes
+            ), f"Node {node_name} is not in the expected nodes for flavor {flavor}."
+            cluster.down()
diff --git a/tests/e2e/support.py b/tests/e2e/support.py
index 6fdd03af9..604884668 100644
--- a/tests/e2e/support.py
+++ b/tests/e2e/support.py
@@ -1,3 +1,4 @@
+import json
 import os
 import random
 import string
@@ -65,19 +66,30 @@ def create_namespace(self):
         return RuntimeError(e)
 
 
-def create_new_resource_flavor(self):
-    self.resource_flavor = f"test-resource-flavor-{random_choice()}"
-    create_resource_flavor(self, self.resource_flavor)
+def create_new_resource_flavor(self, num_flavors):
+    self.resource_flavors = []
+    for i in range(num_flavors):
+        default = i < 1
+        resource_flavor = f"test-resource-flavor-{random_choice()}"
+        create_resource_flavor(self, resource_flavor, default)
+        self.resource_flavors.append(resource_flavor)
 
 
-def create_new_cluster_queue(self):
-    self.cluster_queue = f"test-cluster-queue-{random_choice()}"
-    create_cluster_queue(self, self.cluster_queue, self.resource_flavor)
+def create_new_cluster_queue(self, num_queues):
+    self.cluster_queues = []
+    for i in range(num_queues):
+        cluster_queue_name = f"test-cluster-queue-{random_choice()}"
+        create_cluster_queue(self, cluster_queue_name, self.resource_flavors[i])
+        self.cluster_queues.append(cluster_queue_name)
 
 
-def create_new_local_queue(self):
-    self.local_queue = f"test-local-queue-{random_choice()}"
-    create_local_queue(self, self.cluster_queue, self.local_queue)
+def create_new_local_queue(self, num_queues):
+    self.local_queues = []
+    for i in range(num_queues):
+        is_default = i == 0
+        local_queue_name = f"test-local-queue-{random_choice()}"
+        create_local_queue(self, self.cluster_queues[i], local_queue_name, is_default)
+        self.local_queues.append(local_queue_name)
 
 
 def create_namespace_with_name(self, namespace_name):
@@ -132,7 +144,7 @@ def create_cluster_queue(self, cluster_queue, flavor):
                                 {"name": "memory", "nominalQuota": "36Gi"},
                                 {"name": "nvidia.com/gpu", "nominalQuota": 1},
                             ],
-                        }
+                        },
                     ],
                 }
             ],
@@ -161,11 +173,33 @@ def create_cluster_queue(self, cluster_queue, flavor):
     self.cluster_queue = cluster_queue
 
 
-def create_resource_flavor(self, flavor):
+def create_resource_flavor(self, flavor, default=True):
+    worker_label, worker_value = os.getenv("WORKER_LABEL", "worker-1=true").split("=")
+    control_label, control_value = os.getenv(
+        "CONTROL_LABEL", "ingress-ready=true"
+    ).split("=")
+    toleration_key = os.getenv(
+        "TOLERATION_KEY", "node-role.kubernetes.io/control-plane"
+    )
+
+    node_labels = (
+        {worker_label: worker_value} if default else {control_label: control_value}
+    )
+
     resource_flavor_json = {
         "apiVersion": "kueue.x-k8s.io/v1beta1",
         "kind": "ResourceFlavor",
         "metadata": {"name": flavor},
+        "spec": {
+            "nodeLabels": node_labels,
+            "tolerations": [
+                {
+                    "key": toleration_key,
+                    "operator": "Exists",
+                    "effect": "NoSchedule",
+                }
+            ],
+        },
     }
 
     try:
@@ -190,14 +224,14 @@ def create_resource_flavor(self, flavor):
     self.resource_flavor = flavor
 
 
-def create_local_queue(self, cluster_queue, local_queue):
+def create_local_queue(self, cluster_queue, local_queue, is_default=True):
     local_queue_json = {
         "apiVersion": "kueue.x-k8s.io/v1beta1",
         "kind": "LocalQueue",
         "metadata": {
             "namespace": self.namespace,
             "name": local_queue,
-            "annotations": {"kueue.x-k8s.io/default-queue": "true"},
+            "annotations": {"kueue.x-k8s.io/default-queue": str(is_default).lower()},
         },
         "spec": {"clusterQueue": cluster_queue},
     }
@@ -226,34 +260,77 @@ def create_local_queue(self, cluster_queue, local_queue):
     self.local_queue = local_queue
 
 
-def create_kueue_resources(self):
+def create_kueue_resources(self, resource_ammount=1):
     print("creating Kueue resources ...")
-    create_new_resource_flavor(self)
-    create_new_cluster_queue(self)
-    create_new_local_queue(self)
+    create_new_resource_flavor(self, resource_ammount)
+    create_new_cluster_queue(self, resource_ammount)
+    create_new_local_queue(self, resource_ammount)
 
 
 def delete_kueue_resources(self):
     # Delete if given cluster-queue exists
-    try:
-        self.custom_api.delete_cluster_custom_object(
-            group="kueue.x-k8s.io",
-            plural="clusterqueues",
-            version="v1beta1",
-            name=self.cluster_queue,
-        )
-        print(f"\n'{self.cluster_queue}' cluster-queue deleted")
-    except Exception as e:
-        print(f"\nError deleting cluster-queue '{self.cluster_queue}' : {e}")
+    for cq in self.cluster_queues:
+        try:
+            self.custom_api.delete_cluster_custom_object(
+                group="kueue.x-k8s.io",
+                plural="clusterqueues",
+                version="v1beta1",
+                name=cq,
+            )
+            print(f"\n'{cq}' cluster-queue deleted")
+        except Exception as e:
+            print(f"\nError deleting cluster-queue '{cq}' : {e}")
 
     # Delete if given resource-flavor exists
+    for flavor in self.resource_flavors:
+        try:
+            self.custom_api.delete_cluster_custom_object(
+                group="kueue.x-k8s.io",
+                plural="resourceflavors",
+                version="v1beta1",
+                name=flavor,
+            )
+            print(f"'{flavor}' resource-flavor deleted")
+        except Exception as e:
+            print(f"\nError deleting resource-flavor '{flavor}': {e}")
+
+
+def get_pod_node(self, namespace, name):
+    label_selector = f"ray.io/cluster={name}"
+    pods = self.api_instance.list_namespaced_pod(
+        namespace, label_selector=label_selector
+    )
+    if not pods.items:
+        raise ValueError(
+            f"Unable to retrieve node name for pod '{name}' in namespace '{namespace}'"
+        )
+    pod = pods.items[0]
+    node_name = pod.spec.node_name
+    if node_name is None:
+        raise ValueError(
+            f"No node selected for pod '{name}' in namespace '{namespace}'"
+        )
+    return node_name
+
+
+def get_flavor_spec(self, flavor_name):
     try:
-        self.custom_api.delete_cluster_custom_object(
+        flavor = self.custom_api.get_cluster_custom_object(
             group="kueue.x-k8s.io",
-            plural="resourceflavors",
             version="v1beta1",
-            name=self.resource_flavor,
+            plural="resourceflavors",
+            name=flavor_name,
         )
-        print(f"'{self.resource_flavor}' resource-flavor deleted")
-    except Exception as e:
-        print(f"\nError deleting resource-flavor '{self.resource_flavor}' : {e}")
+        return flavor
+    except client.exceptions.ApiException as e:
+        if e.status == 404:
+            print(f"ResourceFlavor '{flavor_name}' not found.")
+        else:
+            print(f"Error retrieving ResourceFlavor '{flavor_name}': {e}")
+        raise
+
+
+def get_nodes_by_label(self, node_labels):
+    label_selector = ",".join(f"{k}={v}" for k, v in node_labels.items())
+    nodes = self.api_instance.list_node(label_selector=label_selector)
+    return [node.metadata.name for node in nodes.items]