From e88832acff3bd1ea536c756b7d98a8691db1ac80 Mon Sep 17 00:00:00 2001 From: Kevin Date: Wed, 9 Oct 2024 15:49:54 -0400 Subject: [PATCH] add function for listing lqs by flavors Signed-off-by: Kevin --- .../kubernetes_cluster/kube_api_helpers.py | 20 +++++++ src/codeflare_sdk/common/kueue/kueue.py | 54 ++++++++++++++++++- src/codeflare_sdk/common/widgets/widgets.py | 5 +- src/codeflare_sdk/ray/cluster/cluster.py | 20 +------ 4 files changed, 77 insertions(+), 22 deletions(-) diff --git a/src/codeflare_sdk/common/kubernetes_cluster/kube_api_helpers.py b/src/codeflare_sdk/common/kubernetes_cluster/kube_api_helpers.py index efa1d2b6c..0d4b27943 100644 --- a/src/codeflare_sdk/common/kubernetes_cluster/kube_api_helpers.py +++ b/src/codeflare_sdk/common/kubernetes_cluster/kube_api_helpers.py @@ -20,6 +20,7 @@ import executing from kubernetes import client, config from urllib3.util import parse_url +import os # private methods @@ -49,3 +50,22 @@ def _kube_api_error_handling( elif e.reason == "Conflict": raise FileExistsError(exists_msg) raise e + + +def get_current_namespace(): # pragma: no cover + if os.path.isfile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"): + try: + file = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") + active_context = file.readline().strip("\n") + return active_context + except Exception as e: + print("Unable to find current namespace") + print("trying to gather from current context") + try: + _, active_context = config.list_kube_config_contexts(config_check()) + except Exception as e: + return _kube_api_error_handling(e) + try: + return active_context["context"]["namespace"] + except KeyError: + return None diff --git a/src/codeflare_sdk/common/kueue/kueue.py b/src/codeflare_sdk/common/kueue/kueue.py index 0c207548d..c30608bff 100644 --- a/src/codeflare_sdk/common/kueue/kueue.py +++ b/src/codeflare_sdk/common/kueue/kueue.py @@ -12,12 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Optional +from typing import Optional, List from codeflare_sdk.common import _kube_api_error_handling from codeflare_sdk.common.kubernetes_cluster.auth import config_check, get_api_client from kubernetes import client from kubernetes.client.exceptions import ApiException +from codeflare_sdk.common.kubernetes_cluster.kube_api_helpers import ( + get_current_namespace, +) + def get_default_kueue_name(namespace: str): # If the local queue is set, use it. Otherwise, try to use the default queue. @@ -45,6 +49,54 @@ def get_default_kueue_name(namespace: str): return lq["metadata"]["name"] +def list_local_queues( + namespace: Optional[str] = None, flavors: Optional[List[str]] = None +) -> List[dict]: + """ + This function lists all local queues in the namespace provided. + + If no namespace is provided, it will use the current namespace. If flavors is provided, it will only return local + queues that support all the flavors provided. + + Note: + Depending on the version of the local queue API, the available flavors may not be present in the response. + + Args: + namespace (str, optional): The namespace to list local queues from. Defaults to None. + flavors (List[str], optional): The flavors to filter local queues by. Defaults to None. + Returns: + List[dict]: A list of dictionaries containing the name of the local queue and the available flavors + """ + if namespace is None: + namespace = get_current_namespace() + try: + config_check() + api_instance = client.CustomObjectsApi(get_api_client()) + local_queues = api_instance.list_namespace_custom_object( + group="kueue.x-k8s.io", + version="v1beta1", + namespace=namespace, + plural="localqueues", + ) + except ApiException as e: + return _kube_api_error_handling(e) + to_return = [] + for lq in local_queues["items"]: + item = {"name": lq["metadata"]["name"]} + if "availableFlavors" in lq["status"]: + item["availableFlavors"] = [ + f["name"] for f in lq["status"]["availableFlavors"] + ] + if flavors is not None and not set(flavors).issubset( + set(item["availableFlavors"]) + ): + continue + elif flavors is not None: + continue # NOTE: may be indicative old local queue API and might be worth while raising or warning here + to_return.append(item) + return to_return + + def local_queue_exists(namespace: str, local_queue_name: str): # get all local queues in the namespace try: diff --git a/src/codeflare_sdk/common/widgets/widgets.py b/src/codeflare_sdk/common/widgets/widgets.py index 8a13a4d4d..8337f7ad0 100644 --- a/src/codeflare_sdk/common/widgets/widgets.py +++ b/src/codeflare_sdk/common/widgets/widgets.py @@ -33,6 +33,9 @@ config_check, get_api_client, ) +from codeflare_sdk.common.kubernetes_cluster.kube_api_helpers import ( + get_current_namespace, +) class RayClusterManagerWidgets: @@ -43,8 +46,6 @@ class RayClusterManagerWidgets: """ def __init__(self, ray_clusters_df: pd.DataFrame, namespace: str = None): - from ...ray.cluster.cluster import get_current_namespace - # Data self.ray_clusters_df = ray_clusters_df self.namespace = get_current_namespace() if not namespace else namespace diff --git a/src/codeflare_sdk/ray/cluster/cluster.py b/src/codeflare_sdk/ray/cluster/cluster.py index da87639c5..233ab9b88 100644 --- a/src/codeflare_sdk/ray/cluster/cluster.py +++ b/src/codeflare_sdk/ray/cluster/cluster.py @@ -27,6 +27,7 @@ config_check, get_api_client, ) +from ...common.kubernetes_cluster.kube_api_helpers import get_current_namespace from . import pretty_print from .generate_yaml import ( generate_appwrapper, @@ -573,25 +574,6 @@ def list_all_queued( return resources -def get_current_namespace(): # pragma: no cover - if os.path.isfile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"): - try: - file = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") - active_context = file.readline().strip("\n") - return active_context - except Exception as e: - print("Unable to find current namespace") - print("trying to gather from current context") - try: - _, active_context = config.list_kube_config_contexts(config_check()) - except Exception as e: - return _kube_api_error_handling(e) - try: - return active_context["context"]["namespace"] - except KeyError: - return None - - def get_cluster( cluster_name: str, namespace: str = "default",