|
12 | 12 | # See the License for the specific language governing permissions and
|
13 | 13 | # limitations under the License.
|
14 | 14 |
|
15 |
| -from typing import Optional |
| 15 | +from typing import Optional, List |
16 | 16 | from codeflare_sdk.common import _kube_api_error_handling
|
17 | 17 | from codeflare_sdk.common.kubernetes_cluster.auth import config_check, get_api_client
|
18 | 18 | from kubernetes import client
|
19 | 19 | from kubernetes.client.exceptions import ApiException
|
20 | 20 |
|
| 21 | +from codeflare_sdk.common.kubernetes_cluster.kube_api_helpers import ( |
| 22 | + get_current_namespace, |
| 23 | +) |
| 24 | + |
21 | 25 |
|
22 | 26 | def get_default_kueue_name(namespace: str):
|
23 | 27 | # If the local queue is set, use it. Otherwise, try to use the default queue.
|
@@ -45,6 +49,50 @@ def get_default_kueue_name(namespace: str):
|
45 | 49 | return lq["metadata"]["name"]
|
46 | 50 |
|
47 | 51 |
|
| 52 | +def list_local_queues( |
| 53 | + namespace: Optional[str] = None, flavors: Optional[List[str]] = None |
| 54 | +) -> List[dict]: |
| 55 | + """ |
| 56 | + This function lists all local queues in the namespace provided. |
| 57 | +
|
| 58 | + If no namespace is provided, it will use the current namespace. If flavors is provided, it will only return local |
| 59 | + queues that support all the flavors provided. |
| 60 | +
|
| 61 | + Note: |
| 62 | + Depending on the version of the local queue API, the available flavors may not be present in the response. |
| 63 | +
|
| 64 | + Args: |
| 65 | + namespace (str, optional): The namespace to list local queues from. Defaults to None. |
| 66 | + flavors (List[str], optional): The flavors to filter local queues by. Defaults to None. |
| 67 | + Returns: |
| 68 | + List[dict]: A list of dictionaries containing the name of the local queue and the available flavors |
| 69 | + """ |
| 70 | + if namespace is None: # pragma: no cover |
| 71 | + namespace = get_current_namespace() |
| 72 | + try: |
| 73 | + config_check() |
| 74 | + api_instance = client.CustomObjectsApi(get_api_client()) |
| 75 | + local_queues = api_instance.list_namespaced_custom_object( |
| 76 | + group="kueue.x-k8s.io", |
| 77 | + version="v1beta1", |
| 78 | + namespace=namespace, |
| 79 | + plural="localqueues", |
| 80 | + ) |
| 81 | + except ApiException as e: # pragma: no cover |
| 82 | + return _kube_api_error_handling(e) |
| 83 | + to_return = [] |
| 84 | + for lq in local_queues["items"]: |
| 85 | + item = {"name": lq["metadata"]["name"]} |
| 86 | + if "flavors" in lq["status"]: |
| 87 | + item["flavors"] = [f["name"] for f in lq["status"]["flavors"]] |
| 88 | + if flavors is not None and not set(flavors).issubset(set(item["flavors"])): |
| 89 | + continue |
| 90 | + elif flavors is not None: |
| 91 | + continue # NOTE: may be indicative old local queue API and might be worth while raising or warning here |
| 92 | + to_return.append(item) |
| 93 | + return to_return |
| 94 | + |
| 95 | + |
48 | 96 | def local_queue_exists(namespace: str, local_queue_name: str):
|
49 | 97 | # get all local queues in the namespace
|
50 | 98 | try:
|
|
0 commit comments