diff --git a/docs/cluster/auth.html b/docs/cluster/auth.html index 7927b8c8d..dbec66ace 100644 --- a/docs/cluster/auth.html +++ b/docs/cluster/auth.html @@ -53,8 +53,12 @@

Module codeflare_sdk.cluster.auth

""" import abc -import openshift as oc -from openshift import OpenShiftPythonException +from kubernetes import client, config + +global api_client +api_client = None +global config_path +config_path = None class Authentication(metaclass=abc.ABCMeta): @@ -76,83 +80,134 @@

Module codeflare_sdk.cluster.auth

pass +class KubeConfiguration(metaclass=abc.ABCMeta): + """ + An abstract class that defines the method for loading a user defined config file using the `load_kube_config()` function + """ + + def load_kube_config(self): + """ + Method for setting your Kubernetes configuration to a certain file + """ + pass + + def logout(self): + """ + Method for logging out of the remote cluster + """ + pass + + class TokenAuthentication(Authentication): """ - `TokenAuthentication` is a subclass of `Authentication`. It can be used to authenticate to an OpenShift + `TokenAuthentication` is a subclass of `Authentication`. It can be used to authenticate to a Kubernetes cluster when the user has an API token and the API server address. """ - def __init__(self, token: str = None, server: str = None, skip_tls: bool = False): + def __init__( + self, + token: str, + server: str, + skip_tls: bool = False, + ca_cert_path: str = None, + ): """ Initialize a TokenAuthentication object that requires a value for `token`, the API Token - and `server`, the API server address for authenticating to an OpenShift cluster. + and `server`, the API server address for authenticating to a Kubernetes cluster. """ self.token = token self.server = server self.skip_tls = skip_tls + self.ca_cert_path = ca_cert_path def login(self) -> str: """ - This function is used to login to an OpenShift cluster using the user's API token and API server address. - Depending on the cluster, a user can choose to login in with "--insecure-skip-tls-verify` by setting `skip_tls` - to `True`. + This function is used to log in to a Kubernetes cluster using the user's API token and API server address. + Depending on the cluster, a user can choose to login in with `--insecure-skip-tls-verify` by setting `skip_tls` + to `True` or `--certificate-authority` by setting `skip_tls` to False and providing a path to a ca bundle with `ca_cert_path`. """ - args = [f"--token={self.token}", f"--server={self.server}"] - if self.skip_tls: - args.append("--insecure-skip-tls-verify") + global config_path + global api_client try: - response = oc.invoke("login", args) - except OpenShiftPythonException as osp: # pragma: no cover - error_msg = osp.result.err() - if "The server uses a certificate signed by unknown authority" in error_msg: - return "Error: certificate auth failure, please set `skip_tls=True` in TokenAuthentication" - elif "invalid" in error_msg: - raise PermissionError(error_msg) + configuration = client.Configuration() + configuration.api_key_prefix["authorization"] = "Bearer" + configuration.host = self.server + configuration.api_key["authorization"] = self.token + if self.skip_tls == False and self.ca_cert_path == None: + configuration.verify_ssl = True + elif self.skip_tls == False: + configuration.ssl_ca_cert = self.ca_cert_path else: - return error_msg - return response.out() + configuration.verify_ssl = False + api_client = client.ApiClient(configuration) + client.AuthenticationApi(api_client).get_api_group() + config_path = None + return "Logged into %s" % self.server + except client.ApiException: # pragma: no cover + api_client = None + print("Authentication Error please provide the correct token + server") def logout(self) -> str: """ - This function is used to logout of an OpenShift cluster. + This function is used to logout of a Kubernetes cluster. """ - args = [f"--token={self.token}", f"--server={self.server}"] - response = oc.invoke("logout", args) - return response.out() + global config_path + config_path = None + global api_client + api_client = None + return "Successfully logged out of %s" % self.server -class PasswordUserAuthentication(Authentication): +class KubeConfigFileAuthentication(KubeConfiguration): """ - `PasswordUserAuthentication` is a subclass of `Authentication`. It can be used to authenticate to an OpenShift - cluster when the user has a username and password. + A class that defines the necessary methods for passing a user's own Kubernetes config file. + Specifically this class defines the `load_kube_config()` and `config_check()` functions. """ - def __init__( - self, - username: str = None, - password: str = None, - ): - """ - Initialize a PasswordUserAuthentication object that requires a value for `username` - and `password` for authenticating to an OpenShift cluster. - """ - self.username = username - self.password = password + def __init__(self, kube_config_path: str = None): + self.kube_config_path = kube_config_path - def login(self) -> str: + def load_kube_config(self): """ - This function is used to login to an OpenShift cluster using the user's `username` and `password`. + Function for loading a user's own predefined Kubernetes config file. """ - response = oc.login(self.username, self.password) - return response.out() + global config_path + global api_client + try: + if self.kube_config_path == None: + return "Please specify a config file path" + config_path = self.kube_config_path + api_client = None + config.load_kube_config(config_path) + response = "Loaded user config file at path %s" % self.kube_config_path + except config.ConfigException: # pragma: no cover + config_path = None + raise Exception("Please specify a config file path") + return response - def logout(self) -> str: - """ - This function is used to logout of an OpenShift cluster. - """ - response = oc.invoke("logout") - return response.out() + +def config_check() -> str: + """ + Function for loading the config file at the default config location ~/.kube/config if the user has not + specified their own config file or has logged in with their token and server. + """ + global config_path + global api_client + if config_path == None and api_client == None: + config.load_kube_config() + if config_path != None and api_client == None: + return config_path + + +def api_config_handler() -> str: + """ + This function is used to load the api client if the user has logged in + """ + if api_client != None and config_path == None: + return api_client + else: + return None
@@ -160,6 +215,51 @@

Module codeflare_sdk.cluster.auth

+

Functions

+
+
+def api_config_handler() ‑> str +
+
+

This function is used to load the api client if the user has logged in

+
+ +Expand source code + +
def api_config_handler() -> str:
+    """
+    This function is used to load the api client if the user has logged in
+    """
+    if api_client != None and config_path == None:
+        return api_client
+    else:
+        return None
+
+
+
+def config_check() ‑> str +
+
+

Function for loading the config file at the default config location ~/.kube/config if the user has not +specified their own config file or has logged in with their token and server.

+
+ +Expand source code + +
def config_check() -> str:
+    """
+    Function for loading the config file at the default config location ~/.kube/config if the user has not
+    specified their own config file or has logged in with their token and server.
+    """
+    global config_path
+    global api_client
+    if config_path == None and api_client == None:
+        config.load_kube_config()
+    if config_path != None and api_client == None:
+        return config_path
+
+
+

Classes

@@ -194,7 +294,6 @@

Classes

Subclasses

Methods

@@ -233,150 +332,226 @@

Methods

-
-class PasswordUserAuthentication -(username: str = None, password: str = None) +
+class KubeConfigFileAuthentication +(kube_config_path: str = None)
-

PasswordUserAuthentication is a subclass of Authentication. It can be used to authenticate to an OpenShift -cluster when the user has a username and password.

-

Initialize a PasswordUserAuthentication object that requires a value for username -and password for authenticating to an OpenShift cluster.

+

A class that defines the necessary methods for passing a user's own Kubernetes config file. +Specifically this class defines the load_kube_config() and config_check() functions.

Expand source code -
class PasswordUserAuthentication(Authentication):
+
class KubeConfigFileAuthentication(KubeConfiguration):
     """
-    `PasswordUserAuthentication` is a subclass of `Authentication`. It can be used to authenticate to an OpenShift
-    cluster when the user has a username and password.
+    A class that defines the necessary methods for passing a user's own Kubernetes config file.
+    Specifically this class defines the `load_kube_config()` and `config_check()` functions.
     """
 
-    def __init__(
-        self,
-        username: str = None,
-        password: str = None,
-    ):
+    def __init__(self, kube_config_path: str = None):
+        self.kube_config_path = kube_config_path
+
+    def load_kube_config(self):
         """
-        Initialize a PasswordUserAuthentication object that requires a value for `username`
-        and `password` for authenticating to an OpenShift cluster.
+        Function for loading a user's own predefined Kubernetes config file.
         """
-        self.username = username
-        self.password = password
+        global config_path
+        global api_client
+        try:
+            if self.kube_config_path == None:
+                return "Please specify a config file path"
+            config_path = self.kube_config_path
+            api_client = None
+            config.load_kube_config(config_path)
+            response = "Loaded user config file at path %s" % self.kube_config_path
+        except config.ConfigException:  # pragma: no cover
+            config_path = None
+            raise Exception("Please specify a config file path")
+        return response
+
+

Ancestors

+ +

Methods

+
+
+def load_kube_config(self) +
+
+

Function for loading a user's own predefined Kubernetes config file.

+
+ +Expand source code + +
def load_kube_config(self):
+    """
+    Function for loading a user's own predefined Kubernetes config file.
+    """
+    global config_path
+    global api_client
+    try:
+        if self.kube_config_path == None:
+            return "Please specify a config file path"
+        config_path = self.kube_config_path
+        api_client = None
+        config.load_kube_config(config_path)
+        response = "Loaded user config file at path %s" % self.kube_config_path
+    except config.ConfigException:  # pragma: no cover
+        config_path = None
+        raise Exception("Please specify a config file path")
+    return response
+
+
+
+

Inherited members

+ +
+
+class KubeConfiguration +
+
+

An abstract class that defines the method for loading a user defined config file using the load_kube_config() function

+
+ +Expand source code + +
class KubeConfiguration(metaclass=abc.ABCMeta):
+    """
+    An abstract class that defines the method for loading a user defined config file using the `load_kube_config()` function
+    """
 
-    def login(self) -> str:
+    def load_kube_config(self):
         """
-        This function is used to login to an OpenShift cluster using the user's `username` and `password`.
+        Method for setting your Kubernetes configuration to a certain file
         """
-        response = oc.login(self.username, self.password)
-        return response.out()
+        pass
 
-    def logout(self) -> str:
+    def logout(self):
         """
-        This function is used to logout of an OpenShift cluster.
+        Method for logging out of the remote cluster
         """
-        response = oc.invoke("logout")
-        return response.out()
+ pass
-

Ancestors

+

Subclasses

Methods

-
-def login(self) ‑> str +
+def load_kube_config(self)
-

This function is used to login to an OpenShift cluster using the user's username and password.

+

Method for setting your Kubernetes configuration to a certain file

Expand source code -
def login(self) -> str:
+
def load_kube_config(self):
     """
-    This function is used to login to an OpenShift cluster using the user's `username` and `password`.
+    Method for setting your Kubernetes configuration to a certain file
     """
-    response = oc.login(self.username, self.password)
-    return response.out()
+ pass
-
-def logout(self) ‑> str +
+def logout(self)
-

This function is used to logout of an OpenShift cluster.

+

Method for logging out of the remote cluster

Expand source code -
def logout(self) -> str:
+
def logout(self):
     """
-    This function is used to logout of an OpenShift cluster.
+    Method for logging out of the remote cluster
     """
-    response = oc.invoke("logout")
-    return response.out()
+ pass
class TokenAuthentication -(token: str = None, server: str = None, skip_tls: bool = False) +(token: str, server: str, skip_tls: bool = False, ca_cert_path: str = None)
-

TokenAuthentication is a subclass of Authentication. It can be used to authenticate to an OpenShift +

TokenAuthentication is a subclass of Authentication. It can be used to authenticate to a Kubernetes cluster when the user has an API token and the API server address.

Initialize a TokenAuthentication object that requires a value for token, the API Token -and server, the API server address for authenticating to an OpenShift cluster.

+and server, the API server address for authenticating to a Kubernetes cluster.

Expand source code
class TokenAuthentication(Authentication):
     """
-    `TokenAuthentication` is a subclass of `Authentication`. It can be used to authenticate to an OpenShift
+    `TokenAuthentication` is a subclass of `Authentication`. It can be used to authenticate to a Kubernetes
     cluster when the user has an API token and the API server address.
     """
 
-    def __init__(self, token: str = None, server: str = None, skip_tls: bool = False):
+    def __init__(
+        self,
+        token: str,
+        server: str,
+        skip_tls: bool = False,
+        ca_cert_path: str = None,
+    ):
         """
         Initialize a TokenAuthentication object that requires a value for `token`, the API Token
-        and `server`, the API server address for authenticating to an OpenShift cluster.
+        and `server`, the API server address for authenticating to a Kubernetes cluster.
         """
 
         self.token = token
         self.server = server
         self.skip_tls = skip_tls
+        self.ca_cert_path = ca_cert_path
 
     def login(self) -> str:
         """
-        This function is used to login to an OpenShift cluster using the user's API token and API server address.
-        Depending on the cluster, a user can choose to login in with "--insecure-skip-tls-verify` by setting `skip_tls`
-        to `True`.
+        This function is used to log in to a Kubernetes cluster using the user's API token and API server address.
+        Depending on the cluster, a user can choose to login in with `--insecure-skip-tls-verify` by setting `skip_tls`
+        to `True` or `--certificate-authority` by setting `skip_tls` to False and providing a path to a ca bundle with `ca_cert_path`.
         """
-        args = [f"--token={self.token}", f"--server={self.server}"]
-        if self.skip_tls:
-            args.append("--insecure-skip-tls-verify")
+        global config_path
+        global api_client
         try:
-            response = oc.invoke("login", args)
-        except OpenShiftPythonException as osp:  # pragma: no cover
-            error_msg = osp.result.err()
-            if "The server uses a certificate signed by unknown authority" in error_msg:
-                return "Error: certificate auth failure, please set `skip_tls=True` in TokenAuthentication"
-            elif "invalid" in error_msg:
-                raise PermissionError(error_msg)
+            configuration = client.Configuration()
+            configuration.api_key_prefix["authorization"] = "Bearer"
+            configuration.host = self.server
+            configuration.api_key["authorization"] = self.token
+            if self.skip_tls == False and self.ca_cert_path == None:
+                configuration.verify_ssl = True
+            elif self.skip_tls == False:
+                configuration.ssl_ca_cert = self.ca_cert_path
             else:
-                return error_msg
-        return response.out()
+                configuration.verify_ssl = False
+            api_client = client.ApiClient(configuration)
+            client.AuthenticationApi(api_client).get_api_group()
+            config_path = None
+            return "Logged into %s" % self.server
+        except client.ApiException:  # pragma: no cover
+            api_client = None
+            print("Authentication Error please provide the correct token + server")
 
     def logout(self) -> str:
         """
-        This function is used to logout of an OpenShift cluster.
+        This function is used to logout of a Kubernetes cluster.
         """
-        args = [f"--token={self.token}", f"--server={self.server}"]
-        response = oc.invoke("logout", args)
-        return response.out()
+ global config_path + config_path = None + global api_client + api_client = None + return "Successfully logged out of %s" % self.server

Ancestors

    @@ -388,51 +563,59 @@

    Methods

    def login(self) ‑> str
    -

    This function is used to login to an OpenShift cluster using the user's API token and API server address. -Depending on the cluster, a user can choose to login in with "–insecure-skip-tls-verify by setting skip_tls` -to True.

    +

    This function is used to log in to a Kubernetes cluster using the user's API token and API server address. +Depending on the cluster, a user can choose to login in with --insecure-skip-tls-verify by setting skip_tls +to True or --certificate-authority by setting skip_tls to False and providing a path to a ca bundle with ca_cert_path.

    Expand source code
    def login(self) -> str:
         """
    -    This function is used to login to an OpenShift cluster using the user's API token and API server address.
    -    Depending on the cluster, a user can choose to login in with "--insecure-skip-tls-verify` by setting `skip_tls`
    -    to `True`.
    +    This function is used to log in to a Kubernetes cluster using the user's API token and API server address.
    +    Depending on the cluster, a user can choose to login in with `--insecure-skip-tls-verify` by setting `skip_tls`
    +    to `True` or `--certificate-authority` by setting `skip_tls` to False and providing a path to a ca bundle with `ca_cert_path`.
         """
    -    args = [f"--token={self.token}", f"--server={self.server}"]
    -    if self.skip_tls:
    -        args.append("--insecure-skip-tls-verify")
    +    global config_path
    +    global api_client
         try:
    -        response = oc.invoke("login", args)
    -    except OpenShiftPythonException as osp:  # pragma: no cover
    -        error_msg = osp.result.err()
    -        if "The server uses a certificate signed by unknown authority" in error_msg:
    -            return "Error: certificate auth failure, please set `skip_tls=True` in TokenAuthentication"
    -        elif "invalid" in error_msg:
    -            raise PermissionError(error_msg)
    +        configuration = client.Configuration()
    +        configuration.api_key_prefix["authorization"] = "Bearer"
    +        configuration.host = self.server
    +        configuration.api_key["authorization"] = self.token
    +        if self.skip_tls == False and self.ca_cert_path == None:
    +            configuration.verify_ssl = True
    +        elif self.skip_tls == False:
    +            configuration.ssl_ca_cert = self.ca_cert_path
             else:
    -            return error_msg
    -    return response.out()
    + configuration.verify_ssl = False + api_client = client.ApiClient(configuration) + client.AuthenticationApi(api_client).get_api_group() + config_path = None + return "Logged into %s" % self.server + except client.ApiException: # pragma: no cover + api_client = None + print("Authentication Error please provide the correct token + server")
    def logout(self) ‑> str
    -

    This function is used to logout of an OpenShift cluster.

    +

    This function is used to logout of a Kubernetes cluster.

    Expand source code
    def logout(self) -> str:
         """
    -    This function is used to logout of an OpenShift cluster.
    +    This function is used to logout of a Kubernetes cluster.
         """
    -    args = [f"--token={self.token}", f"--server={self.server}"]
    -    response = oc.invoke("logout", args)
    -    return response.out()
    + global config_path + config_path = None + global api_client + api_client = None + return "Successfully logged out of %s" % self.server
    @@ -451,6 +634,12 @@

    Index

  • codeflare_sdk.cluster
+
  • Functions

    + +
  • Classes

    • @@ -461,10 +650,16 @@

      PasswordUserAuthentication

      +

      KubeConfigFileAuthentication

      + +
    • +
    • +

      KubeConfiguration

    • diff --git a/docs/cluster/awload.html b/docs/cluster/awload.html index 2d4c776d1..d2afb7ea9 100644 --- a/docs/cluster/awload.html +++ b/docs/cluster/awload.html @@ -51,9 +51,12 @@

      Module codeflare_sdk.cluster.awload

      from os.path import isfile import errno import os -import openshift as oc import yaml +from kubernetes import client, config +from ..utils.kube_api_helpers import _kube_api_error_handling +from .auth import config_check, api_config_handler + class AWManager: """ @@ -71,10 +74,10 @@

      Module codeflare_sdk.cluster.awload

      self.filename = filename try: with open(self.filename) as f: - awyaml = yaml.load(f, Loader=yaml.FullLoader) - assert awyaml["kind"] == "AppWrapper" - self.name = awyaml["metadata"]["name"] - self.namespace = awyaml["metadata"]["namespace"] + self.awyaml = yaml.load(f, Loader=yaml.FullLoader) + assert self.awyaml["kind"] == "AppWrapper" + self.name = self.awyaml["metadata"]["name"] + self.namespace = self.awyaml["metadata"]["namespace"] except: raise ValueError( f"{filename } is not a correctly formatted AppWrapper yaml" @@ -86,19 +89,17 @@

      Module codeflare_sdk.cluster.awload

      Attempts to create the AppWrapper custom resource using the yaml file """ try: - with oc.project(self.namespace): - oc.invoke("create", ["-f", self.filename]) - except oc.OpenShiftPythonException as osp: # pragma: no cover - error_msg = osp.result.err() - if "Unauthorized" in error_msg or "Forbidden" in error_msg: - raise PermissionError( - "Action not permitted, have you put in correct/up-to-date auth credentials?" - ) - elif "AlreadyExists" in error_msg: - raise FileExistsError( - f"An AppWrapper of the name {self.name} already exists in namespace {self.namespace}" - ) - raise osp + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + api_instance.create_namespaced_custom_object( + group="mcad.ibm.com", + version="v1beta1", + namespace=self.namespace, + plural="appwrappers", + body=self.awyaml, + ) + except Exception as e: + return _kube_api_error_handling(e) self.submitted = True print(f"AppWrapper {self.filename} submitted!") @@ -113,25 +114,17 @@

      Module codeflare_sdk.cluster.awload

      return try: - with oc.project(self.namespace): - oc.invoke("delete", ["AppWrapper", self.name]) - except oc.OpenShiftPythonException as osp: # pragma: no cover - error_msg = osp.result.err() - if ( - 'the server doesn\'t have a resource type "AppWrapper"' in error_msg - or "forbidden" in error_msg - or "Unauthorized" in error_msg - or "Missing or incomplete configuration" in error_msg - ): - raise PermissionError( - "Action not permitted, have you put in correct/up-to-date auth credentials?" - ) - elif "not found" in error_msg: - self.submitted = False - print("AppWrapper not found, was deleted in another manner") - return - else: - raise osp + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + api_instance.delete_namespaced_custom_object( + group="mcad.ibm.com", + version="v1beta1", + namespace=self.namespace, + plural="appwrappers", + name=self.name, + ) + except Exception as e: + return _kube_api_error_handling(e) self.submitted = False print(f"AppWrapper {self.name} removed!") @@ -175,10 +168,10 @@

      Classes

      self.filename = filename try: with open(self.filename) as f: - awyaml = yaml.load(f, Loader=yaml.FullLoader) - assert awyaml["kind"] == "AppWrapper" - self.name = awyaml["metadata"]["name"] - self.namespace = awyaml["metadata"]["namespace"] + self.awyaml = yaml.load(f, Loader=yaml.FullLoader) + assert self.awyaml["kind"] == "AppWrapper" + self.name = self.awyaml["metadata"]["name"] + self.namespace = self.awyaml["metadata"]["namespace"] except: raise ValueError( f"{filename } is not a correctly formatted AppWrapper yaml" @@ -190,19 +183,17 @@

      Classes

      Attempts to create the AppWrapper custom resource using the yaml file """ try: - with oc.project(self.namespace): - oc.invoke("create", ["-f", self.filename]) - except oc.OpenShiftPythonException as osp: # pragma: no cover - error_msg = osp.result.err() - if "Unauthorized" in error_msg or "Forbidden" in error_msg: - raise PermissionError( - "Action not permitted, have you put in correct/up-to-date auth credentials?" - ) - elif "AlreadyExists" in error_msg: - raise FileExistsError( - f"An AppWrapper of the name {self.name} already exists in namespace {self.namespace}" - ) - raise osp + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + api_instance.create_namespaced_custom_object( + group="mcad.ibm.com", + version="v1beta1", + namespace=self.namespace, + plural="appwrappers", + body=self.awyaml, + ) + except Exception as e: + return _kube_api_error_handling(e) self.submitted = True print(f"AppWrapper {self.filename} submitted!") @@ -217,25 +208,17 @@

      Classes

      return try: - with oc.project(self.namespace): - oc.invoke("delete", ["AppWrapper", self.name]) - except oc.OpenShiftPythonException as osp: # pragma: no cover - error_msg = osp.result.err() - if ( - 'the server doesn\'t have a resource type "AppWrapper"' in error_msg - or "forbidden" in error_msg - or "Unauthorized" in error_msg - or "Missing or incomplete configuration" in error_msg - ): - raise PermissionError( - "Action not permitted, have you put in correct/up-to-date auth credentials?" - ) - elif "not found" in error_msg: - self.submitted = False - print("AppWrapper not found, was deleted in another manner") - return - else: - raise osp + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + api_instance.delete_namespaced_custom_object( + group="mcad.ibm.com", + version="v1beta1", + namespace=self.namespace, + plural="appwrappers", + name=self.name, + ) + except Exception as e: + return _kube_api_error_handling(e) self.submitted = False print(f"AppWrapper {self.name} removed!") @@ -262,25 +245,17 @@

      Methods

      return try: - with oc.project(self.namespace): - oc.invoke("delete", ["AppWrapper", self.name]) - except oc.OpenShiftPythonException as osp: # pragma: no cover - error_msg = osp.result.err() - if ( - 'the server doesn\'t have a resource type "AppWrapper"' in error_msg - or "forbidden" in error_msg - or "Unauthorized" in error_msg - or "Missing or incomplete configuration" in error_msg - ): - raise PermissionError( - "Action not permitted, have you put in correct/up-to-date auth credentials?" - ) - elif "not found" in error_msg: - self.submitted = False - print("AppWrapper not found, was deleted in another manner") - return - else: - raise osp + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + api_instance.delete_namespaced_custom_object( + group="mcad.ibm.com", + version="v1beta1", + namespace=self.namespace, + plural="appwrappers", + name=self.name, + ) + except Exception as e: + return _kube_api_error_handling(e) self.submitted = False print(f"AppWrapper {self.name} removed!") @@ -300,19 +275,17 @@

      Methods

      Attempts to create the AppWrapper custom resource using the yaml file """ try: - with oc.project(self.namespace): - oc.invoke("create", ["-f", self.filename]) - except oc.OpenShiftPythonException as osp: # pragma: no cover - error_msg = osp.result.err() - if "Unauthorized" in error_msg or "Forbidden" in error_msg: - raise PermissionError( - "Action not permitted, have you put in correct/up-to-date auth credentials?" - ) - elif "AlreadyExists" in error_msg: - raise FileExistsError( - f"An AppWrapper of the name {self.name} already exists in namespace {self.namespace}" - ) - raise osp + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + api_instance.create_namespaced_custom_object( + group="mcad.ibm.com", + version="v1beta1", + namespace=self.namespace, + plural="appwrappers", + body=self.awyaml, + ) + except Exception as e: + return _kube_api_error_handling(e) self.submitted = True print(f"AppWrapper {self.filename} submitted!") diff --git a/docs/cluster/cluster.html b/docs/cluster/cluster.html index 1c04bc067..e8205b425 100644 --- a/docs/cluster/cluster.html +++ b/docs/cluster/cluster.html @@ -50,15 +50,15 @@

      Module codeflare_sdk.cluster.cluster

      cluster setup queue, a list of all existing clusters, and the user's working namespace. """ -from os import stat from time import sleep from typing import List, Optional, Tuple, Dict -import openshift as oc from ray.job_submission import JobSubmissionClient +from .auth import config_check, api_config_handler from ..utils import pretty_print from ..utils.generate_yaml import generate_appwrapper +from ..utils.kube_api_helpers import _kube_api_error_handling from .config import ClusterConfiguration from .model import ( AppWrapper, @@ -67,6 +67,9 @@

      Module codeflare_sdk.cluster.cluster

      RayCluster, RayClusterStatus, ) +from kubernetes import client, config +import yaml +import os class Cluster: @@ -97,8 +100,10 @@

      Module codeflare_sdk.cluster.cluster

      """ if self.config.namespace is None: - self.config.namespace = oc.get_project_name() - if type(self.config.namespace) is not str: + self.config.namespace = get_current_namespace() + if self.config.namespace is None: + print("Please specify with namespace=<your_current_namespace>") + elif type(self.config.namespace) is not str: raise TypeError( f"Namespace {self.config.namespace} is of type {type(self.config.namespace)}. Check your Kubernetes Authentication." ) @@ -109,8 +114,8 @@

      Module codeflare_sdk.cluster.cluster

      max_cpu = self.config.max_cpus min_memory = self.config.min_memory max_memory = self.config.max_memory - gpu = self.config.gpu - workers = self.config.max_worker + gpu = self.config.num_gpus + workers = self.config.num_workers template = self.config.template image = self.config.image instascale = self.config.instascale @@ -144,15 +149,19 @@

      Module codeflare_sdk.cluster.cluster

      """ namespace = self.config.namespace try: - with oc.project(namespace): - oc.invoke("apply", ["-f", self.app_wrapper_yaml]) - except oc.OpenShiftPythonException as osp: # pragma: no cover - error_msg = osp.result.err() - if "Unauthorized" in error_msg: - raise PermissionError( - "Action not permitted, have you put in correct/up-to-date auth credentials?" - ) - raise osp + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + with open(self.app_wrapper_yaml) as f: + aw = yaml.load(f, Loader=yaml.FullLoader) + api_instance.create_namespaced_custom_object( + group="mcad.ibm.com", + version="v1beta1", + namespace=namespace, + plural="appwrappers", + body=aw, + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) def down(self): """ @@ -161,23 +170,17 @@

      Module codeflare_sdk.cluster.cluster

      """ namespace = self.config.namespace try: - with oc.project(namespace): - oc.invoke("delete", ["AppWrapper", self.app_wrapper_name]) - except oc.OpenShiftPythonException as osp: # pragma: no cover - error_msg = osp.result.err() - if ( - 'the server doesn\'t have a resource type "AppWrapper"' in error_msg - or "forbidden" in error_msg - or "Unauthorized" in error_msg - or "Missing or incomplete configuration" in error_msg - ): - raise PermissionError( - "Action not permitted, have you run auth.login()/cluster.up() yet?" - ) - elif "not found" in error_msg: - print("Cluster not found, have you run cluster.up() yet?") - else: - raise osp + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + api_instance.delete_namespaced_custom_object( + group="mcad.ibm.com", + version="v1beta1", + namespace=namespace, + plural="appwrappers", + name=self.app_wrapper_name, + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) def status( self, print_to_console: bool = True @@ -205,9 +208,15 @@

      Module codeflare_sdk.cluster.cluster

      ready = False status = CodeFlareClusterStatus.FAILED # should deleted be separate return status, ready # exit early, no need to check ray status - elif appwrapper.status in [AppWrapperStatus.PENDING]: + elif appwrapper.status in [ + AppWrapperStatus.PENDING, + AppWrapperStatus.QUEUEING, + ]: ready = False - status = CodeFlareClusterStatus.QUEUED + if appwrapper.status == AppWrapperStatus.PENDING: + status = CodeFlareClusterStatus.QUEUED + else: + status = CodeFlareClusterStatus.QUEUEING if print_to_console: pretty_print.print_app_wrappers_status([appwrapper]) return ( @@ -230,7 +239,7 @@

      Module codeflare_sdk.cluster.cluster

      if print_to_console: # overriding the number of gpus with requested - cluster.worker_gpu = self.config.gpu + cluster.worker_gpu = self.config.num_gpus pretty_print.print_cluster_status(cluster) elif print_to_console: if status == CodeFlareClusterStatus.UNKNOWN: @@ -279,16 +288,21 @@

      Module codeflare_sdk.cluster.cluster

      Returns a string containing the cluster's dashboard URI. """ try: - with oc.project(self.config.namespace): - route = oc.invoke( - "get", ["route", "-o", "jsonpath='{$.items[*].spec.host}'"] - ) - route = route.out().split(" ") - route = [x for x in route if f"ray-dashboard-{self.config.name}" in x] - route = route[0].strip().strip("'") - return f"http://{route}" - except: - return "Dashboard route not available yet, have you run cluster.up()?" + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + routes = api_instance.list_namespaced_custom_object( + group="route.openshift.io", + version="v1", + namespace=self.config.namespace, + plural="routes", + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) + + for route in routes["items"]: + if route["metadata"]["name"] == f"ray-dashboard-{self.config.name}": + return f"http://{route['spec']['host']}" + return "Dashboard route not available yet, have you run cluster.up()?" def list_jobs(self) -> List: """ @@ -328,6 +342,55 @@

      Module codeflare_sdk.cluster.cluster

      to_return["requirements"] = requirements return to_return + def from_k8_cluster_object(rc): + machine_types = ( + rc["metadata"]["labels"]["orderedinstance"].split("_") + if "orderedinstance" in rc["metadata"]["labels"] + else [] + ) + local_interactive = ( + "volumeMounts" + in rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][0] + ) + cluster_config = ClusterConfiguration( + name=rc["metadata"]["name"], + namespace=rc["metadata"]["namespace"], + machine_types=machine_types, + num_workers=rc["spec"]["workerGroupSpecs"][0]["minReplicas"], + min_cpus=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][ + "containers" + ][0]["resources"]["requests"]["cpu"], + max_cpus=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][ + "containers" + ][0]["resources"]["limits"]["cpu"], + min_memory=int( + rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][0][ + "resources" + ]["requests"]["memory"][:-1] + ), + max_memory=int( + rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][0][ + "resources" + ]["limits"]["memory"][:-1] + ), + num_gpus=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][ + "containers" + ][0]["resources"]["limits"]["nvidia.com/gpu"], + instascale=True if machine_types else False, + image=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][ + 0 + ]["image"], + local_interactive=local_interactive, + ) + return Cluster(cluster_config) + + def local_client_url(self): + if self.config.local_interactive == True: + ingress_domain = _get_ingress_domain() + return f"ray://rayclient-{self.config.name}-{self.config.namespace}.{ingress_domain}" + else: + return "None" + def list_all_clusters(namespace: str, print_to_console: bool = True): """ @@ -352,78 +415,120 @@

      Module codeflare_sdk.cluster.cluster

      return app_wrappers +def get_current_namespace(): # pragma: no cover + if api_config_handler() != None: + if os.path.isfile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"): + try: + file = open( + "/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r" + ) + active_context = file.readline().strip("\n") + return active_context + except Exception as e: + print("Unable to find current namespace") + return None + else: + print("Unable to find current namespace") + return None + else: + try: + _, active_context = config.list_kube_config_contexts(config_check()) + except Exception as e: + return _kube_api_error_handling(e) + try: + return active_context["context"]["namespace"] + except KeyError: + return None + + +def get_cluster(cluster_name: str, namespace: str = "default"): + try: + config.load_kube_config() + api_instance = client.CustomObjectsApi() + rcs = api_instance.list_namespaced_custom_object( + group="ray.io", + version="v1alpha1", + namespace=namespace, + plural="rayclusters", + ) + except Exception as e: + return _kube_api_error_handling(e) + + for rc in rcs["items"]: + if rc["metadata"]["name"] == cluster_name: + return Cluster.from_k8_cluster_object(rc) + raise FileNotFoundError( + f"Cluster {cluster_name} is not found in {namespace} namespace" + ) + + # private methods +def _get_ingress_domain(): + try: + config.load_kube_config() + api_client = client.CustomObjectsApi(api_config_handler()) + ingress = api_client.get_cluster_custom_object( + "config.openshift.io", "v1", "ingresses", "cluster" + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) + return ingress["spec"]["domain"] def _app_wrapper_status(name, namespace="default") -> Optional[AppWrapper]: - cluster = None try: - with oc.project(namespace), oc.timeout(10 * 60): - cluster = oc.selector(f"appwrapper/{name}").object() - except oc.OpenShiftPythonException as osp: # pragma: no cover - msg = osp.msg - if "Expected a single object, but selected 0" in msg: - return cluster - error_msg = osp.result.err() - if not ( - 'the server doesn\'t have a resource type "appwrapper"' in error_msg - or "forbidden" in error_msg - or "Unauthorized" in error_msg - or "Missing or incomplete configuration" in error_msg - ): - raise osp - - if cluster: - return _map_to_app_wrapper(cluster) - - return cluster + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + aws = api_instance.list_namespaced_custom_object( + group="mcad.ibm.com", + version="v1beta1", + namespace=namespace, + plural="appwrappers", + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) + + for aw in aws["items"]: + if aw["metadata"]["name"] == name: + return _map_to_app_wrapper(aw) + return None def _ray_cluster_status(name, namespace="default") -> Optional[RayCluster]: - cluster = None try: - with oc.project(namespace), oc.timeout(10 * 60): - cluster = oc.selector(f"rayclusters/{name}").object() - except oc.OpenShiftPythonException as osp: # pragma: no cover - msg = osp.msg - if "Expected a single object, but selected 0" in msg: - return cluster - error_msg = osp.result.err() - if not ( - 'the server doesn\'t have a resource type "rayclusters"' in error_msg - or "forbidden" in error_msg - or "Unauthorized" in error_msg - or "Missing or incomplete configuration" in error_msg - ): - raise osp - - if cluster: - return _map_to_ray_cluster(cluster) - - return cluster + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + rcs = api_instance.list_namespaced_custom_object( + group="ray.io", + version="v1alpha1", + namespace=namespace, + plural="rayclusters", + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) + + for rc in rcs["items"]: + if rc["metadata"]["name"] == name: + return _map_to_ray_cluster(rc) + return None def _get_ray_clusters(namespace="default") -> List[RayCluster]: list_of_clusters = [] try: - with oc.project(namespace), oc.timeout(10 * 60): - ray_clusters = oc.selector("rayclusters").objects() - except oc.OpenShiftPythonException as osp: # pragma: no cover - error_msg = osp.result.err() - if ( - 'the server doesn\'t have a resource type "rayclusters"' in error_msg - or "forbidden" in error_msg - or "Unauthorized" in error_msg - or "Missing or incomplete configuration" in error_msg - ): - raise PermissionError( - "Action not permitted, have you put in correct/up-to-date auth credentials?" - ) - else: - raise osp + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + rcs = api_instance.list_namespaced_custom_object( + group="ray.io", + version="v1alpha1", + namespace=namespace, + plural="rayclusters", + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) - for cluster in ray_clusters: - list_of_clusters.append(_map_to_ray_cluster(cluster)) + for rc in rcs["items"]: + list_of_clusters.append(_map_to_ray_cluster(rc)) return list_of_clusters @@ -433,23 +538,18 @@

      Module codeflare_sdk.cluster.cluster

      list_of_app_wrappers = [] try: - with oc.project(namespace), oc.timeout(10 * 60): - app_wrappers = oc.selector("appwrappers").objects() - except oc.OpenShiftPythonException as osp: # pragma: no cover - error_msg = osp.result.err() - if ( - 'the server doesn\'t have a resource type "appwrappers"' in error_msg - or "forbidden" in error_msg - or "Unauthorized" in error_msg - or "Missing or incomplete configuration" in error_msg - ): - raise PermissionError( - "Action not permitted, have you put in correct/up-to-date auth credentials?" - ) - else: - raise osp + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + aws = api_instance.list_namespaced_custom_object( + group="mcad.ibm.com", + version="v1beta1", + namespace=namespace, + plural="appwrappers", + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) - for item in app_wrappers: + for item in aws["items"]: app_wrapper = _map_to_app_wrapper(item) if filter and app_wrapper.status in filter: list_of_app_wrappers.append(app_wrapper) @@ -459,48 +559,58 @@

      Module codeflare_sdk.cluster.cluster

      return list_of_app_wrappers -def _map_to_ray_cluster(cluster) -> Optional[RayCluster]: - cluster_model = cluster.model - if type(cluster_model.status.state) == oc.model.MissingModel: - status = RayClusterStatus.UNKNOWN +def _map_to_ray_cluster(rc) -> Optional[RayCluster]: + if "state" in rc["status"]: + status = RayClusterStatus(rc["status"]["state"].lower()) else: - status = RayClusterStatus(cluster_model.status.state.lower()) + status = RayClusterStatus.UNKNOWN - with oc.project(cluster.namespace()), oc.timeout(10 * 60): - route = ( - oc.selector(f"route/ray-dashboard-{cluster.name()}") - .object() - .model.spec.host - ) + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + routes = api_instance.list_namespaced_custom_object( + group="route.openshift.io", + version="v1", + namespace=rc["metadata"]["namespace"], + plural="routes", + ) + ray_route = None + for route in routes["items"]: + if route["metadata"]["name"] == f"ray-dashboard-{rc['metadata']['name']}": + ray_route = route["spec"]["host"] return RayCluster( - name=cluster.name(), + name=rc["metadata"]["name"], status=status, # for now we are not using autoscaling so same replicas is fine - min_workers=cluster_model.spec.workerGroupSpecs[0].replicas, - max_workers=cluster_model.spec.workerGroupSpecs[0].replicas, - worker_mem_max=cluster_model.spec.workerGroupSpecs[0] - .template.spec.containers[0] - .resources.limits.memory, - worker_mem_min=cluster_model.spec.workerGroupSpecs[0] - .template.spec.containers[0] - .resources.requests.memory, - worker_cpu=cluster_model.spec.workerGroupSpecs[0] - .template.spec.containers[0] - .resources.limits.cpu, + workers=rc["spec"]["workerGroupSpecs"][0]["replicas"], + worker_mem_max=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][ + "containers" + ][0]["resources"]["limits"]["memory"], + worker_mem_min=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][ + "containers" + ][0]["resources"]["requests"]["memory"], + worker_cpu=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][ + 0 + ]["resources"]["limits"]["cpu"], worker_gpu=0, # hard to detect currently how many gpus, can override it with what the user asked for - namespace=cluster.namespace(), - dashboard=route, + namespace=rc["metadata"]["namespace"], + dashboard=ray_route, ) -def _map_to_app_wrapper(cluster) -> AppWrapper: - cluster_model = cluster.model +def _map_to_app_wrapper(aw) -> AppWrapper: + if "status" in aw and "canrun" in aw["status"]: + return AppWrapper( + name=aw["metadata"]["name"], + status=AppWrapperStatus(aw["status"]["state"].lower()), + can_run=aw["status"]["canrun"], + job_state=aw["status"]["queuejobstate"], + ) return AppWrapper( - name=cluster.name(), - status=AppWrapperStatus(cluster_model.status.state.lower()), - can_run=cluster_model.status.canrun, - job_state=cluster_model.status.queuejobstate, + name=aw["metadata"]["name"], + status=AppWrapperStatus("queueing"), + can_run=False, + job_state="Still adding to queue", ) @@ -508,12 +618,11 @@

      Module codeflare_sdk.cluster.cluster

      ray = RayCluster( name=cluster.config.name, status=cluster.status(print_to_console=False)[0], - min_workers=cluster.config.min_worker, - max_workers=cluster.config.max_worker, + workers=cluster.config.num_workers, worker_mem_min=cluster.config.min_memory, worker_mem_max=cluster.config.max_memory, worker_cpu=cluster.config.min_cpus, - worker_gpu=cluster.config.gpu, + worker_gpu=cluster.config.num_gpus, namespace=cluster.config.namespace, dashboard=cluster.cluster_dashboard_uri(), ) @@ -529,6 +638,71 @@

      Module codeflare_sdk.cluster.cluster

      Functions

      +
      +def get_cluster(cluster_name: str, namespace: str = 'default') +
      +
      +
      +
      + +Expand source code + +
      def get_cluster(cluster_name: str, namespace: str = "default"):
      +    try:
      +        config.load_kube_config()
      +        api_instance = client.CustomObjectsApi()
      +        rcs = api_instance.list_namespaced_custom_object(
      +            group="ray.io",
      +            version="v1alpha1",
      +            namespace=namespace,
      +            plural="rayclusters",
      +        )
      +    except Exception as e:
      +        return _kube_api_error_handling(e)
      +
      +    for rc in rcs["items"]:
      +        if rc["metadata"]["name"] == cluster_name:
      +            return Cluster.from_k8_cluster_object(rc)
      +    raise FileNotFoundError(
      +        f"Cluster {cluster_name} is not found in {namespace} namespace"
      +    )
      +
      +
      +
      +def get_current_namespace() +
      +
      +
      +
      + +Expand source code + +
      def get_current_namespace():  # pragma: no cover
      +    if api_config_handler() != None:
      +        if os.path.isfile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"):
      +            try:
      +                file = open(
      +                    "/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r"
      +                )
      +                active_context = file.readline().strip("\n")
      +                return active_context
      +            except Exception as e:
      +                print("Unable to find current namespace")
      +                return None
      +        else:
      +            print("Unable to find current namespace")
      +            return None
      +    else:
      +        try:
      +            _, active_context = config.list_kube_config_contexts(config_check())
      +        except Exception as e:
      +            return _kube_api_error_handling(e)
      +        try:
      +            return active_context["context"]["namespace"]
      +        except KeyError:
      +            return None
      +
      +
      def list_all_clusters(namespace: str, print_to_console: bool = True)
      @@ -620,8 +794,10 @@

      Classes

      """ if self.config.namespace is None: - self.config.namespace = oc.get_project_name() - if type(self.config.namespace) is not str: + self.config.namespace = get_current_namespace() + if self.config.namespace is None: + print("Please specify with namespace=<your_current_namespace>") + elif type(self.config.namespace) is not str: raise TypeError( f"Namespace {self.config.namespace} is of type {type(self.config.namespace)}. Check your Kubernetes Authentication." ) @@ -632,8 +808,8 @@

      Classes

      max_cpu = self.config.max_cpus min_memory = self.config.min_memory max_memory = self.config.max_memory - gpu = self.config.gpu - workers = self.config.max_worker + gpu = self.config.num_gpus + workers = self.config.num_workers template = self.config.template image = self.config.image instascale = self.config.instascale @@ -667,15 +843,19 @@

      Classes

      """ namespace = self.config.namespace try: - with oc.project(namespace): - oc.invoke("apply", ["-f", self.app_wrapper_yaml]) - except oc.OpenShiftPythonException as osp: # pragma: no cover - error_msg = osp.result.err() - if "Unauthorized" in error_msg: - raise PermissionError( - "Action not permitted, have you put in correct/up-to-date auth credentials?" - ) - raise osp + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + with open(self.app_wrapper_yaml) as f: + aw = yaml.load(f, Loader=yaml.FullLoader) + api_instance.create_namespaced_custom_object( + group="mcad.ibm.com", + version="v1beta1", + namespace=namespace, + plural="appwrappers", + body=aw, + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) def down(self): """ @@ -684,23 +864,17 @@

      Classes

      """ namespace = self.config.namespace try: - with oc.project(namespace): - oc.invoke("delete", ["AppWrapper", self.app_wrapper_name]) - except oc.OpenShiftPythonException as osp: # pragma: no cover - error_msg = osp.result.err() - if ( - 'the server doesn\'t have a resource type "AppWrapper"' in error_msg - or "forbidden" in error_msg - or "Unauthorized" in error_msg - or "Missing or incomplete configuration" in error_msg - ): - raise PermissionError( - "Action not permitted, have you run auth.login()/cluster.up() yet?" - ) - elif "not found" in error_msg: - print("Cluster not found, have you run cluster.up() yet?") - else: - raise osp + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + api_instance.delete_namespaced_custom_object( + group="mcad.ibm.com", + version="v1beta1", + namespace=namespace, + plural="appwrappers", + name=self.app_wrapper_name, + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) def status( self, print_to_console: bool = True @@ -728,9 +902,15 @@

      Classes

      ready = False status = CodeFlareClusterStatus.FAILED # should deleted be separate return status, ready # exit early, no need to check ray status - elif appwrapper.status in [AppWrapperStatus.PENDING]: + elif appwrapper.status in [ + AppWrapperStatus.PENDING, + AppWrapperStatus.QUEUEING, + ]: ready = False - status = CodeFlareClusterStatus.QUEUED + if appwrapper.status == AppWrapperStatus.PENDING: + status = CodeFlareClusterStatus.QUEUED + else: + status = CodeFlareClusterStatus.QUEUEING if print_to_console: pretty_print.print_app_wrappers_status([appwrapper]) return ( @@ -753,7 +933,7 @@

      Classes

      if print_to_console: # overriding the number of gpus with requested - cluster.worker_gpu = self.config.gpu + cluster.worker_gpu = self.config.num_gpus pretty_print.print_cluster_status(cluster) elif print_to_console: if status == CodeFlareClusterStatus.UNKNOWN: @@ -802,16 +982,21 @@

      Classes

      Returns a string containing the cluster's dashboard URI. """ try: - with oc.project(self.config.namespace): - route = oc.invoke( - "get", ["route", "-o", "jsonpath='{$.items[*].spec.host}'"] - ) - route = route.out().split(" ") - route = [x for x in route if f"ray-dashboard-{self.config.name}" in x] - route = route[0].strip().strip("'") - return f"http://{route}" - except: - return "Dashboard route not available yet, have you run cluster.up()?" + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + routes = api_instance.list_namespaced_custom_object( + group="route.openshift.io", + version="v1", + namespace=self.config.namespace, + plural="routes", + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) + + for route in routes["items"]: + if route["metadata"]["name"] == f"ray-dashboard-{self.config.name}": + return f"http://{route['spec']['host']}" + return "Dashboard route not available yet, have you run cluster.up()?" def list_jobs(self) -> List: """ @@ -849,7 +1034,56 @@

      Classes

      to_return["working_dir"] = working_dir if requirements: to_return["requirements"] = requirements - return to_return + return to_return + + def from_k8_cluster_object(rc): + machine_types = ( + rc["metadata"]["labels"]["orderedinstance"].split("_") + if "orderedinstance" in rc["metadata"]["labels"] + else [] + ) + local_interactive = ( + "volumeMounts" + in rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][0] + ) + cluster_config = ClusterConfiguration( + name=rc["metadata"]["name"], + namespace=rc["metadata"]["namespace"], + machine_types=machine_types, + num_workers=rc["spec"]["workerGroupSpecs"][0]["minReplicas"], + min_cpus=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][ + "containers" + ][0]["resources"]["requests"]["cpu"], + max_cpus=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][ + "containers" + ][0]["resources"]["limits"]["cpu"], + min_memory=int( + rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][0][ + "resources" + ]["requests"]["memory"][:-1] + ), + max_memory=int( + rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][0][ + "resources" + ]["limits"]["memory"][:-1] + ), + num_gpus=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][ + "containers" + ][0]["resources"]["limits"]["nvidia.com/gpu"], + instascale=True if machine_types else False, + image=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][ + 0 + ]["image"], + local_interactive=local_interactive, + ) + return Cluster(cluster_config) + + def local_client_url(self): + if self.config.local_interactive == True: + ingress_domain = _get_ingress_domain() + return f"ray://rayclient-{self.config.name}-{self.config.namespace}.{ingress_domain}" + else: + return "None"

      Class variables

      @@ -874,16 +1108,21 @@

      Methods

      Returns a string containing the cluster's dashboard URI. """ try: - with oc.project(self.config.namespace): - route = oc.invoke( - "get", ["route", "-o", "jsonpath='{$.items[*].spec.host}'"] - ) - route = route.out().split(" ") - route = [x for x in route if f"ray-dashboard-{self.config.name}" in x] - route = route[0].strip().strip("'") - return f"http://{route}" - except: - return "Dashboard route not available yet, have you run cluster.up()?" + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + routes = api_instance.list_namespaced_custom_object( + group="route.openshift.io", + version="v1", + namespace=self.config.namespace, + plural="routes", + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) + + for route in routes["items"]: + if route["metadata"]["name"] == f"ray-dashboard-{self.config.name}": + return f"http://{route['spec']['host']}" + return "Dashboard route not available yet, have you run cluster.up()?"
  • @@ -919,8 +1158,10 @@

    Methods

    """ if self.config.namespace is None: - self.config.namespace = oc.get_project_name() - if type(self.config.namespace) is not str: + self.config.namespace = get_current_namespace() + if self.config.namespace is None: + print("Please specify with namespace=<your_current_namespace>") + elif type(self.config.namespace) is not str: raise TypeError( f"Namespace {self.config.namespace} is of type {type(self.config.namespace)}. Check your Kubernetes Authentication." ) @@ -931,8 +1172,8 @@

    Methods

    max_cpu = self.config.max_cpus min_memory = self.config.min_memory max_memory = self.config.max_memory - gpu = self.config.gpu - workers = self.config.max_worker + gpu = self.config.num_gpus + workers = self.config.num_workers template = self.config.template image = self.config.image instascale = self.config.instascale @@ -992,23 +1233,69 @@

    Methods

    """ namespace = self.config.namespace try: - with oc.project(namespace): - oc.invoke("delete", ["AppWrapper", self.app_wrapper_name]) - except oc.OpenShiftPythonException as osp: # pragma: no cover - error_msg = osp.result.err() - if ( - 'the server doesn\'t have a resource type "AppWrapper"' in error_msg - or "forbidden" in error_msg - or "Unauthorized" in error_msg - or "Missing or incomplete configuration" in error_msg - ): - raise PermissionError( - "Action not permitted, have you run auth.login()/cluster.up() yet?" - ) - elif "not found" in error_msg: - print("Cluster not found, have you run cluster.up() yet?") - else: - raise osp
    + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + api_instance.delete_namespaced_custom_object( + group="mcad.ibm.com", + version="v1beta1", + namespace=namespace, + plural="appwrappers", + name=self.app_wrapper_name, + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) + + +
    +def from_k8_cluster_object(rc) +
    +
    +
    +
    + +Expand source code + +
    def from_k8_cluster_object(rc):
    +    machine_types = (
    +        rc["metadata"]["labels"]["orderedinstance"].split("_")
    +        if "orderedinstance" in rc["metadata"]["labels"]
    +        else []
    +    )
    +    local_interactive = (
    +        "volumeMounts"
    +        in rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][0]
    +    )
    +    cluster_config = ClusterConfiguration(
    +        name=rc["metadata"]["name"],
    +        namespace=rc["metadata"]["namespace"],
    +        machine_types=machine_types,
    +        num_workers=rc["spec"]["workerGroupSpecs"][0]["minReplicas"],
    +        min_cpus=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][
    +            "containers"
    +        ][0]["resources"]["requests"]["cpu"],
    +        max_cpus=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][
    +            "containers"
    +        ][0]["resources"]["limits"]["cpu"],
    +        min_memory=int(
    +            rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][0][
    +                "resources"
    +            ]["requests"]["memory"][:-1]
    +        ),
    +        max_memory=int(
    +            rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][0][
    +                "resources"
    +            ]["limits"]["memory"][:-1]
    +        ),
    +        num_gpus=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][
    +            "containers"
    +        ][0]["resources"]["limits"]["nvidia.com/gpu"],
    +        instascale=True if machine_types else False,
    +        image=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][
    +            0
    +        ]["image"],
    +        local_interactive=local_interactive,
    +    )
    +    return Cluster(cluster_config)
    @@ -1065,6 +1352,23 @@

    Methods

    return client.list_jobs()
    +
    +def local_client_url(self) +
    +
    +
    +
    + +Expand source code + +
    def local_client_url(self):
    +    if self.config.local_interactive == True:
    +        ingress_domain = _get_ingress_domain()
    +        return f"ray://rayclient-{self.config.name}-{self.config.namespace}.{ingress_domain}"
    +    else:
    +        return "None"
    +
    +
    def status(self, print_to_console: bool = True) ‑> Tuple[CodeFlareClusterStatus, bool]
    @@ -1101,9 +1405,15 @@

    Methods

    ready = False status = CodeFlareClusterStatus.FAILED # should deleted be separate return status, ready # exit early, no need to check ray status - elif appwrapper.status in [AppWrapperStatus.PENDING]: + elif appwrapper.status in [ + AppWrapperStatus.PENDING, + AppWrapperStatus.QUEUEING, + ]: ready = False - status = CodeFlareClusterStatus.QUEUED + if appwrapper.status == AppWrapperStatus.PENDING: + status = CodeFlareClusterStatus.QUEUED + else: + status = CodeFlareClusterStatus.QUEUEING if print_to_console: pretty_print.print_app_wrappers_status([appwrapper]) return ( @@ -1126,7 +1436,7 @@

    Methods

    if print_to_console: # overriding the number of gpus with requested - cluster.worker_gpu = self.config.gpu + cluster.worker_gpu = self.config.num_gpus pretty_print.print_cluster_status(cluster) elif print_to_console: if status == CodeFlareClusterStatus.UNKNOWN: @@ -1178,15 +1488,19 @@

    Methods

    """ namespace = self.config.namespace try: - with oc.project(namespace): - oc.invoke("apply", ["-f", self.app_wrapper_yaml]) - except oc.OpenShiftPythonException as osp: # pragma: no cover - error_msg = osp.result.err() - if "Unauthorized" in error_msg: - raise PermissionError( - "Action not permitted, have you put in correct/up-to-date auth credentials?" - ) - raise osp + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + with open(self.app_wrapper_yaml) as f: + aw = yaml.load(f, Loader=yaml.FullLoader) + api_instance.create_namespaced_custom_object( + group="mcad.ibm.com", + version="v1beta1", + namespace=namespace, + plural="appwrappers", + body=aw, + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e)
    @@ -1240,6 +1554,8 @@

    Index

  • Functions

    @@ -1254,9 +1570,11 @@

    create_app_wrapper

  • details
  • down
  • +
  • from_k8_cluster_object
  • job_logs
  • job_status
  • list_jobs
  • +
  • local_client_url
  • status
  • torchx_config
  • torchx_scheduler
  • diff --git a/docs/cluster/config.html b/docs/cluster/config.html index 12342486a..168d252b2 100644 --- a/docs/cluster/config.html +++ b/docs/cluster/config.html @@ -51,9 +51,7 @@

    Module codeflare_sdk.cluster.config

    """ from dataclasses import dataclass, field -from .auth import Authentication import pathlib -import openshift dir = pathlib.Path(__file__).parent.parent.resolve() @@ -71,15 +69,14 @@

    Module codeflare_sdk.cluster.config

    machine_types: list = field(default_factory=list) # ["m4.xlarge", "g4dn.xlarge"] min_cpus: int = 1 max_cpus: int = 1 - min_worker: int = 1 - max_worker: int = 1 + num_workers: int = 1 min_memory: int = 2 max_memory: int = 2 - gpu: int = 0 + num_gpus: int = 0 template: str = f"{dir}/templates/base-template.yaml" instascale: bool = False envs: dict = field(default_factory=dict) - image: str = "ghcr.io/foundation-model-stack/base:ray2.1.0-py38-gpu-pytorch1.12.0cu116-20221213-193103" + image: str = "quay.io/project-codeflare/ray:2.5.0-py38-cu116" local_interactive: bool = False image_pull_secrets: list = field(default_factory=list)
    @@ -95,7 +92,7 @@

    Classes

    class ClusterConfiguration -(name: str, namespace: str = None, head_info: list = <factory>, machine_types: list = <factory>, min_cpus: int = 1, max_cpus: int = 1, min_worker: int = 1, max_worker: int = 1, min_memory: int = 2, max_memory: int = 2, gpu: int = 0, template: str = '/home/runner/work/codeflare-sdk/codeflare-sdk/src/codeflare_sdk/templates/base-template.yaml', instascale: bool = False, envs: dict = <factory>, image: str = 'ghcr.io/foundation-model-stack/base:ray2.1.0-py38-gpu-pytorch1.12.0cu116-20221213-193103', local_interactive: bool = False, image_pull_secrets: list = <factory>) +(name: str, namespace: str = None, head_info: list = <factory>, machine_types: list = <factory>, min_cpus: int = 1, max_cpus: int = 1, num_workers: int = 1, min_memory: int = 2, max_memory: int = 2, num_gpus: int = 0, template: str = '/home/runner/work/codeflare-sdk/codeflare-sdk/src/codeflare_sdk/templates/base-template.yaml', instascale: bool = False, envs: dict = <factory>, image: str = 'quay.io/project-codeflare/ray:2.5.0-py38-cu116', local_interactive: bool = False, image_pull_secrets: list = <factory>)

    This dataclass is used to specify resource requirements and other details, and @@ -116,15 +113,14 @@

    Classes

    machine_types: list = field(default_factory=list) # ["m4.xlarge", "g4dn.xlarge"] min_cpus: int = 1 max_cpus: int = 1 - min_worker: int = 1 - max_worker: int = 1 + num_workers: int = 1 min_memory: int = 2 max_memory: int = 2 - gpu: int = 0 + num_gpus: int = 0 template: str = f"{dir}/templates/base-template.yaml" instascale: bool = False envs: dict = field(default_factory=dict) - image: str = "ghcr.io/foundation-model-stack/base:ray2.1.0-py38-gpu-pytorch1.12.0cu116-20221213-193103" + image: str = "quay.io/project-codeflare/ray:2.5.0-py38-cu116" local_interactive: bool = False image_pull_secrets: list = field(default_factory=list) @@ -134,10 +130,6 @@

    Class variables

    -
    var gpu : int
    -
    -
    -
    var head_info : list
    @@ -170,27 +162,27 @@

    Class variables

    -
    var max_worker : int
    +
    var min_cpus : int
    -
    var min_cpus : int
    +
    var min_memory : int
    -
    var min_memory : int
    +
    var name : str
    -
    var min_worker : int
    +
    var namespace : str
    -
    var name : str
    +
    var num_gpus : int
    -
    var namespace : str
    +
    var num_workers : int
    @@ -220,7 +212,6 @@

    Index

    ClusterConfiguration

    diff --git a/docs/cluster/model.html b/docs/cluster/model.html index 4097aef2b..7d911255b 100644 --- a/docs/cluster/model.html +++ b/docs/cluster/model.html @@ -72,6 +72,7 @@

    Module codeflare_sdk.cluster.model

    Defines the possible reportable states of an AppWrapper. """ + QUEUEING = "queueing" PENDING = "pending" RUNNING = "running" FAILED = "failed" @@ -88,8 +89,9 @@

    Module codeflare_sdk.cluster.model

    READY = 1 STARTING = 2 QUEUED = 3 - FAILED = 4 - UNKNOWN = 5 + QUEUEING = 4 + FAILED = 5 + UNKNOWN = 6 @dataclass @@ -100,8 +102,7 @@

    Module codeflare_sdk.cluster.model

    name: str status: RayClusterStatus - min_workers: int - max_workers: int + workers: int worker_mem_min: str worker_mem_max: str worker_cpu: int @@ -186,6 +187,7 @@

    Class variables

    Defines the possible reportable states of an AppWrapper. """ + QUEUEING = "queueing" PENDING = "pending" RUNNING = "running" FAILED = "failed" @@ -215,6 +217,10 @@

    Class variables

    +
    var QUEUEING
    +
    +
    +
    var RUNNING
    @@ -243,8 +249,9 @@

    Class variables

    READY = 1 STARTING = 2 QUEUED = 3 - FAILED = 4 - UNKNOWN = 5 + QUEUEING = 4 + FAILED = 5 + UNKNOWN = 6

    Ancestors

      @@ -260,6 +267,10 @@

      Class variables

      +
      var QUEUEING
      +
      +
      +
      var READY
      @@ -276,7 +287,7 @@

      Class variables

      class RayCluster -(name: str, status: RayClusterStatus, min_workers: int, max_workers: int, worker_mem_min: str, worker_mem_max: str, worker_cpu: int, worker_gpu: int, namespace: str, dashboard: str) +(name: str, status: RayClusterStatus, workers: int, worker_mem_min: str, worker_mem_max: str, worker_cpu: int, worker_gpu: int, namespace: str, dashboard: str)

      For storing information about a Ray cluster.

      @@ -291,8 +302,7 @@

      Class variables

      name: str status: RayClusterStatus - min_workers: int - max_workers: int + workers: int worker_mem_min: str worker_mem_max: str worker_cpu: int @@ -306,14 +316,6 @@

      Class variables

      -
      var max_workers : int
      -
      -
      -
      -
      var min_workers : int
      -
      -
      -
      var name : str
      @@ -342,6 +344,10 @@

      Class variables

      +
      var workers : int
      +
      +
      +
    @@ -421,15 +427,17 @@

    DELETED
  • FAILED
  • PENDING
  • +
  • QUEUEING
  • RUNNING
  • RUNNING_HOLD_COMPLETION
  • CodeFlareClusterStatus

    -
      +
      • FAILED
      • QUEUED
      • +
      • QUEUEING
      • READY
      • STARTING
      • UNKNOWN
      • @@ -439,8 +447,6 @@

        RayCluster

      • diff --git a/docs/job/jobs.html b/docs/job/jobs.html index 74e7bb695..96ea47449 100644 --- a/docs/job/jobs.html +++ b/docs/job/jobs.html @@ -45,13 +45,13 @@

        Module codeflare_sdk.job.jobs

        from typing import TYPE_CHECKING, Optional, Dict, List from pathlib import Path -import openshift as oc from torchx.components.dist import ddp from torchx.runner import get_runner from torchx.specs import AppHandle, parse_app_handle, AppDryRunInfo if TYPE_CHECKING: from ..cluster.cluster import Cluster +from ..cluster.cluster import get_current_namespace all_jobs: List["Job"] = [] torchx_runner = get_runner() @@ -119,7 +119,7 @@

        Module codeflare_sdk.job.jobs

        self.workspace = workspace def _dry_run(self, cluster: "Cluster"): - j = f"{cluster.config.max_worker}x{max(cluster.config.gpu, 1)}" # # of proc. = # of gpus + j = f"{cluster.config.num_workers}x{max(cluster.config.num_gpus, 1)}" # # of proc. = # of gpus return torchx_runner.dryrun( app=ddp( *self.script_args, @@ -128,7 +128,7 @@

        Module codeflare_sdk.job.jobs

        name=self.name, h=self.h, cpu=self.cpu if self.cpu is not None else cluster.config.max_cpus, - gpu=self.gpu if self.gpu is not None else cluster.config.gpu, + gpu=self.gpu if self.gpu is not None else cluster.config.num_gpus, memMB=self.memMB if self.memMB is not None else cluster.config.max_memory * 1024, @@ -152,7 +152,7 @@

        Module codeflare_sdk.job.jobs

        def _dry_run_no_cluster(self): if self.scheduler_args is not None: if self.scheduler_args.get("namespace") is None: - self.scheduler_args["namespace"] = oc.get_project_name() + self.scheduler_args["namespace"] = get_current_namespace() return torchx_runner.dryrun( app=ddp( *self.script_args, @@ -359,7 +359,7 @@

        Methods

        self.workspace = workspace def _dry_run(self, cluster: "Cluster"): - j = f"{cluster.config.max_worker}x{max(cluster.config.gpu, 1)}" # # of proc. = # of gpus + j = f"{cluster.config.num_workers}x{max(cluster.config.num_gpus, 1)}" # # of proc. = # of gpus return torchx_runner.dryrun( app=ddp( *self.script_args, @@ -368,7 +368,7 @@

        Methods

        name=self.name, h=self.h, cpu=self.cpu if self.cpu is not None else cluster.config.max_cpus, - gpu=self.gpu if self.gpu is not None else cluster.config.gpu, + gpu=self.gpu if self.gpu is not None else cluster.config.num_gpus, memMB=self.memMB if self.memMB is not None else cluster.config.max_memory * 1024, @@ -392,7 +392,7 @@

        Methods

        def _dry_run_no_cluster(self): if self.scheduler_args is not None: if self.scheduler_args.get("namespace") is None: - self.scheduler_args["namespace"] = oc.get_project_name() + self.scheduler_args["namespace"] = get_current_namespace() return torchx_runner.dryrun( app=ddp( *self.script_args, diff --git a/docs/utils/generate_cert.html b/docs/utils/generate_cert.html index 9360f2c5f..b41846f95 100644 --- a/docs/utils/generate_cert.html +++ b/docs/utils/generate_cert.html @@ -47,6 +47,7 @@

        Module codeflare_sdk.utils.generate_cert

        from cryptography import x509 from cryptography.x509.oid import NameOID import datetime +from ..cluster.auth import config_check, api_config_handler from kubernetes import client, config @@ -110,8 +111,8 @@

        Module codeflare_sdk.utils.generate_cert

        # Similar to: # oc get secret ca-secret-<cluster-name> -o template='{{index .data "ca.key"}}' # oc get secret ca-secret-<cluster-name> -o template='{{index .data "ca.crt"}}'|base64 -d > ${TLSDIR}/ca.crt - config.load_kube_config() - v1 = client.CoreV1Api() + config_check() + v1 = client.CoreV1Api(api_config_handler()) secret = v1.read_namespaced_secret(f"ca-secret-{cluster_name}", namespace).data ca_cert = secret.get("ca.crt") ca_key = secret.get("ca.key") @@ -291,8 +292,8 @@

        Functions

        # Similar to: # oc get secret ca-secret-<cluster-name> -o template='{{index .data "ca.key"}}' # oc get secret ca-secret-<cluster-name> -o template='{{index .data "ca.crt"}}'|base64 -d > ${TLSDIR}/ca.crt - config.load_kube_config() - v1 = client.CoreV1Api() + config_check() + v1 = client.CoreV1Api(api_config_handler()) secret = v1.read_namespaced_secret(f"ca-secret-{cluster_name}", namespace).data ca_cert = secret.get("ca.crt") ca_key = secret.get("ca.key") diff --git a/docs/utils/generate_yaml.html b/docs/utils/generate_yaml.html index a0a05d292..f8e6dcb97 100644 --- a/docs/utils/generate_yaml.html +++ b/docs/utils/generate_yaml.html @@ -52,7 +52,9 @@

        Module codeflare_sdk.utils.generate_yaml

        import sys import argparse import uuid -import openshift as oc +from kubernetes import client, config +from .kube_api_helpers import _kube_api_error_handling +from ..cluster.auth import api_config_handler def read_template(template): @@ -279,12 +281,16 @@

        Module codeflare_sdk.utils.generate_yaml

        ][0].get("command")[2] command = command.replace("deployment-name", cluster_name) - - server_name = ( - oc.whoami("--show-server").split(":")[1].split("//")[1].replace("api", "apps") - ) - - command = command.replace("server-name", server_name) + try: + config.load_kube_config() + api_client = client.CustomObjectsApi(api_config_handler()) + ingress = api_client.get_cluster_custom_object( + "config.openshift.io", "v1", "ingresses", "cluster" + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) + domain = ingress["spec"]["domain"] + command = command.replace("server-name", domain) item["generictemplate"]["spec"]["headGroupSpec"]["template"]["spec"][ "initContainers" @@ -292,26 +298,61 @@

        Module codeflare_sdk.utils.generate_yaml

        def disable_raycluster_tls(resources): - del resources["GenericItems"][0]["generictemplate"]["spec"]["headGroupSpec"][ - "template" - ]["spec"]["volumes"] - del resources["GenericItems"][0]["generictemplate"]["spec"]["headGroupSpec"][ - "template" - ]["spec"]["containers"][0]["volumeMounts"] - del resources["GenericItems"][0]["generictemplate"]["spec"]["headGroupSpec"][ - "template" - ]["spec"]["initContainers"] - del resources["GenericItems"][0]["generictemplate"]["spec"]["workerGroupSpecs"][0][ - "template" - ]["spec"]["volumes"] - del resources["GenericItems"][0]["generictemplate"]["spec"]["workerGroupSpecs"][0][ - "template" - ]["spec"]["containers"][0]["volumeMounts"] - del resources["GenericItems"][0]["generictemplate"]["spec"]["workerGroupSpecs"][0][ - "template" - ]["spec"]["initContainers"][1] - del resources["GenericItems"][3] # rayclient route - del resources["GenericItems"][2] # ca-secret + generic_template_spec = resources["GenericItems"][0]["generictemplate"]["spec"] + + if "volumes" in generic_template_spec["headGroupSpec"]["template"]["spec"]: + del generic_template_spec["headGroupSpec"]["template"]["spec"]["volumes"] + + if ( + "volumeMounts" + in generic_template_spec["headGroupSpec"]["template"]["spec"]["containers"][0] + ): + del generic_template_spec["headGroupSpec"]["template"]["spec"]["containers"][0][ + "volumeMounts" + ] + + if "initContainers" in generic_template_spec["headGroupSpec"]["template"]["spec"]: + del generic_template_spec["headGroupSpec"]["template"]["spec"]["initContainers"] + + if "volumes" in generic_template_spec["workerGroupSpecs"][0]["template"]["spec"]: + del generic_template_spec["workerGroupSpecs"][0]["template"]["spec"]["volumes"] + + if ( + "volumeMounts" + in generic_template_spec["workerGroupSpecs"][0]["template"]["spec"][ + "containers" + ][0] + ): + del generic_template_spec["workerGroupSpecs"][0]["template"]["spec"][ + "containers" + ][0]["volumeMounts"] + + for i in range( + len( + generic_template_spec["workerGroupSpecs"][0]["template"]["spec"][ + "initContainers" + ] + ) + ): + if ( + generic_template_spec["workerGroupSpecs"][0]["template"]["spec"][ + "initContainers" + ][i]["name"] + == "create-cert" + ): + del generic_template_spec["workerGroupSpecs"][0]["template"]["spec"][ + "initContainers" + ][i] + + updated_items = [] + for i in resources["GenericItems"][:]: + if "rayclient-deployment-name" in i["generictemplate"]["metadata"]["name"]: + continue + if "ca-secret-deployment-name" in i["generictemplate"]["metadata"]["name"]: + continue + updated_items.append(i) + + resources["GenericItems"] = updated_items def write_user_appwrapper(user_yaml, output_file_name): @@ -368,135 +409,7 @@

        Module codeflare_sdk.utils.generate_yaml

        disable_raycluster_tls(resources["resources"]) outfile = appwrapper_name + ".yaml" write_user_appwrapper(user_yaml, outfile) - return outfile - - -def main(): # pragma: no cover - parser = argparse.ArgumentParser(description="Generate user AppWrapper") - parser.add_argument( - "--name", - required=False, - default="", - help="User selected name for AppWrapper and Ray Cluster (auto-generated if not provided)", - ) - parser.add_argument( - "--min-cpu", - type=int, - required=True, - help="min number of CPU(s) in a worker required for running job", - ) - parser.add_argument( - "--max-cpu", - type=int, - required=True, - help="max number of CPU(s) in a worker required for running job", - ) - parser.add_argument( - "--min-memory", - type=int, - required=True, - help="min RAM required in a worker for running job, in GB", - ) - parser.add_argument( - "--max-memory", - type=int, - required=True, - help="max RAM required in a worker for running job, in GB", - ) - parser.add_argument( - "--gpu", - type=int, - required=True, - help="GPU(s) required in a worker for running job", - ) - parser.add_argument( - "--workers", - type=int, - required=True, - help="How many workers are required in the cluster", - ) - parser.add_argument( - "--template", required=True, help="Template AppWrapper yaml file" - ) - parser.add_argument( - "--image", - required=False, - default="rayproject/ray:latest", - help="Ray image to be used (defaults to rayproject/ray:latest)", - ) - parser.add_argument( - "--instascale", - default=False, - required=False, - action="store_true", - help="Indicates that instascale is installed on the cluster", - ) - parser.add_argument( - "--instance-types", - type=str, - nargs="+", - default=[], - required=False, - help="Head,worker instance types (space separated)", - ) - parser.add_argument( - "--namespace", - required=False, - default="default", - help="Set the kubernetes namespace you want to deploy your cluster to. Default. If left blank, uses the 'default' namespace", - ) - parser.add_argument( - "--local-interactive", - required=False, - default=False, - help="Enable local interactive mode", - ) - parser.add_argument( - "--image-pull-secrets", - required=False, - default=[], - help="Set image pull secrets for private registries", - ) - - args = parser.parse_args() - name = args.name - min_cpu = args.min_cpu - max_cpu = args.max_cpu - min_memory = args.min_memory - max_memory = args.max_memory - gpu = args.gpu - workers = args.workers - template = args.template - image = args.image - instascale = args.instascale - instance_types = args.instance_types - namespace = args.namespace - local_interactive = args.local_interactive - env = {} - image_pull_secrets = args.image_pull_secrets - - outfile = generate_appwrapper( - name, - namespace, - min_cpu, - max_cpu, - min_memory, - max_memory, - gpu, - workers, - template, - image, - instascale, - instance_types, - local_interactive, - env, - image_pull_secrets, - ) - return outfile - - -if __name__ == "__main__": # pragma: no cover - main()
        + return outfile
  • @@ -516,26 +429,61 @@

    Functions

    Expand source code
    def disable_raycluster_tls(resources):
    -    del resources["GenericItems"][0]["generictemplate"]["spec"]["headGroupSpec"][
    -        "template"
    -    ]["spec"]["volumes"]
    -    del resources["GenericItems"][0]["generictemplate"]["spec"]["headGroupSpec"][
    -        "template"
    -    ]["spec"]["containers"][0]["volumeMounts"]
    -    del resources["GenericItems"][0]["generictemplate"]["spec"]["headGroupSpec"][
    -        "template"
    -    ]["spec"]["initContainers"]
    -    del resources["GenericItems"][0]["generictemplate"]["spec"]["workerGroupSpecs"][0][
    -        "template"
    -    ]["spec"]["volumes"]
    -    del resources["GenericItems"][0]["generictemplate"]["spec"]["workerGroupSpecs"][0][
    -        "template"
    -    ]["spec"]["containers"][0]["volumeMounts"]
    -    del resources["GenericItems"][0]["generictemplate"]["spec"]["workerGroupSpecs"][0][
    -        "template"
    -    ]["spec"]["initContainers"][1]
    -    del resources["GenericItems"][3]  # rayclient route
    -    del resources["GenericItems"][2]  # ca-secret
    + generic_template_spec = resources["GenericItems"][0]["generictemplate"]["spec"] + + if "volumes" in generic_template_spec["headGroupSpec"]["template"]["spec"]: + del generic_template_spec["headGroupSpec"]["template"]["spec"]["volumes"] + + if ( + "volumeMounts" + in generic_template_spec["headGroupSpec"]["template"]["spec"]["containers"][0] + ): + del generic_template_spec["headGroupSpec"]["template"]["spec"]["containers"][0][ + "volumeMounts" + ] + + if "initContainers" in generic_template_spec["headGroupSpec"]["template"]["spec"]: + del generic_template_spec["headGroupSpec"]["template"]["spec"]["initContainers"] + + if "volumes" in generic_template_spec["workerGroupSpecs"][0]["template"]["spec"]: + del generic_template_spec["workerGroupSpecs"][0]["template"]["spec"]["volumes"] + + if ( + "volumeMounts" + in generic_template_spec["workerGroupSpecs"][0]["template"]["spec"][ + "containers" + ][0] + ): + del generic_template_spec["workerGroupSpecs"][0]["template"]["spec"][ + "containers" + ][0]["volumeMounts"] + + for i in range( + len( + generic_template_spec["workerGroupSpecs"][0]["template"]["spec"][ + "initContainers" + ] + ) + ): + if ( + generic_template_spec["workerGroupSpecs"][0]["template"]["spec"][ + "initContainers" + ][i]["name"] + == "create-cert" + ): + del generic_template_spec["workerGroupSpecs"][0]["template"]["spec"][ + "initContainers" + ][i] + + updated_items = [] + for i in resources["GenericItems"][:]: + if "rayclient-deployment-name" in i["generictemplate"]["metadata"]["name"]: + continue + if "ca-secret-deployment-name" in i["generictemplate"]["metadata"]["name"]: + continue + updated_items.append(i) + + resources["GenericItems"] = updated_items
    @@ -573,12 +521,16 @@

    Functions

    ][0].get("command")[2] command = command.replace("deployment-name", cluster_name) - - server_name = ( - oc.whoami("--show-server").split(":")[1].split("//")[1].replace("api", "apps") - ) - - command = command.replace("server-name", server_name) + try: + config.load_kube_config() + api_client = client.CustomObjectsApi(api_config_handler()) + ingress = api_client.get_cluster_custom_object( + "config.openshift.io", "v1", "ingresses", "cluster" + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) + domain = ingress["spec"]["domain"] + command = command.replace("server-name", domain) item["generictemplate"]["spec"]["headGroupSpec"]["template"]["spec"][ "initContainers" @@ -664,139 +616,6 @@

    Functions

    return outfile
    -
    -def main() -
    -
    -
    -
    - -Expand source code - -
    def main():  # pragma: no cover
    -    parser = argparse.ArgumentParser(description="Generate user AppWrapper")
    -    parser.add_argument(
    -        "--name",
    -        required=False,
    -        default="",
    -        help="User selected name for AppWrapper and Ray Cluster (auto-generated if not provided)",
    -    )
    -    parser.add_argument(
    -        "--min-cpu",
    -        type=int,
    -        required=True,
    -        help="min number of CPU(s) in a worker required for running job",
    -    )
    -    parser.add_argument(
    -        "--max-cpu",
    -        type=int,
    -        required=True,
    -        help="max number of CPU(s) in a worker required for running job",
    -    )
    -    parser.add_argument(
    -        "--min-memory",
    -        type=int,
    -        required=True,
    -        help="min RAM required in a worker for running job, in GB",
    -    )
    -    parser.add_argument(
    -        "--max-memory",
    -        type=int,
    -        required=True,
    -        help="max RAM required in a worker for running job, in GB",
    -    )
    -    parser.add_argument(
    -        "--gpu",
    -        type=int,
    -        required=True,
    -        help="GPU(s) required in a worker for running job",
    -    )
    -    parser.add_argument(
    -        "--workers",
    -        type=int,
    -        required=True,
    -        help="How many workers are required in the cluster",
    -    )
    -    parser.add_argument(
    -        "--template", required=True, help="Template AppWrapper yaml file"
    -    )
    -    parser.add_argument(
    -        "--image",
    -        required=False,
    -        default="rayproject/ray:latest",
    -        help="Ray image to be used (defaults to rayproject/ray:latest)",
    -    )
    -    parser.add_argument(
    -        "--instascale",
    -        default=False,
    -        required=False,
    -        action="store_true",
    -        help="Indicates that instascale is installed on the cluster",
    -    )
    -    parser.add_argument(
    -        "--instance-types",
    -        type=str,
    -        nargs="+",
    -        default=[],
    -        required=False,
    -        help="Head,worker instance types (space separated)",
    -    )
    -    parser.add_argument(
    -        "--namespace",
    -        required=False,
    -        default="default",
    -        help="Set the kubernetes namespace you want to deploy your cluster to. Default. If left blank, uses the 'default' namespace",
    -    )
    -    parser.add_argument(
    -        "--local-interactive",
    -        required=False,
    -        default=False,
    -        help="Enable local interactive mode",
    -    )
    -    parser.add_argument(
    -        "--image-pull-secrets",
    -        required=False,
    -        default=[],
    -        help="Set image pull secrets for private registries",
    -    )
    -
    -    args = parser.parse_args()
    -    name = args.name
    -    min_cpu = args.min_cpu
    -    max_cpu = args.max_cpu
    -    min_memory = args.min_memory
    -    max_memory = args.max_memory
    -    gpu = args.gpu
    -    workers = args.workers
    -    template = args.template
    -    image = args.image
    -    instascale = args.instascale
    -    instance_types = args.instance_types
    -    namespace = args.namespace
    -    local_interactive = args.local_interactive
    -    env = {}
    -    image_pull_secrets = args.image_pull_secrets
    -
    -    outfile = generate_appwrapper(
    -        name,
    -        namespace,
    -        min_cpu,
    -        max_cpu,
    -        min_memory,
    -        max_memory,
    -        gpu,
    -        workers,
    -        template,
    -        image,
    -        instascale,
    -        instance_types,
    -        local_interactive,
    -        env,
    -        image_pull_secrets,
    -    )
    -    return outfile
    -
    -
    def read_template(template)
    @@ -1138,7 +957,6 @@

    Index

  • enable_local_interactive
  • gen_names
  • generate_appwrapper
  • -
  • main
  • read_template
  • update_affinity
  • update_ca_secret
  • diff --git a/docs/utils/index.html b/docs/utils/index.html index 9ce65d9ef..1eb081d2b 100644 --- a/docs/utils/index.html +++ b/docs/utils/index.html @@ -35,6 +35,11 @@

    Sub-modules

    This sub-module exists primarily to be used internally by the Cluster object (in the cluster sub-module) for AppWrapper generation.

    +
    codeflare_sdk.utils.kube_api_helpers
    +
    +

    This sub-module exists primarily to be used internally for any Kubernetes +API error handling or wrapping.

    +
    codeflare_sdk.utils.pretty_print

    This sub-module exists primarily to be used internally by the Cluster object @@ -64,6 +69,7 @@

    Index

    diff --git a/docs/utils/kube_api_helpers.html b/docs/utils/kube_api_helpers.html new file mode 100644 index 000000000..4c2ecb781 --- /dev/null +++ b/docs/utils/kube_api_helpers.html @@ -0,0 +1,105 @@ + + + + + + +codeflare_sdk.utils.kube_api_helpers API documentation + + + + + + + + + + + +
    +
    +
    +

    Module codeflare_sdk.utils.kube_api_helpers

    +
    +
    +

    This sub-module exists primarily to be used internally for any Kubernetes +API error handling or wrapping.

    +
    + +Expand source code + +
    # Copyright 2022 IBM, Red Hat
    +#
    +# Licensed under the Apache License, Version 2.0 (the "License");
    +# you may not use this file except in compliance with the License.
    +# You may obtain a copy of the License at
    +#
    +#      http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +
    +"""
    +This sub-module exists primarily to be used internally for any Kubernetes
    +API error handling or wrapping.
    +"""
    +
    +import executing
    +from kubernetes import client, config
    +
    +
    +# private methods
    +def _kube_api_error_handling(e: Exception):  # pragma: no cover
    +    perm_msg = (
    +        "Action not permitted, have you put in correct/up-to-date auth credentials?"
    +    )
    +    nf_msg = "No instances found, nothing to be done."
    +    exists_msg = "Resource with this name already exists."
    +    if type(e) == config.ConfigException:
    +        raise PermissionError(perm_msg)
    +    if type(e) == executing.executing.NotOneValueFound:
    +        print(nf_msg)
    +        return
    +    if type(e) == client.ApiException:
    +        if e.reason == "Not Found":
    +            print(nf_msg)
    +            return
    +        elif e.reason == "Unauthorized" or e.reason == "Forbidden":
    +            raise PermissionError(perm_msg)
    +        elif e.reason == "Conflict":
    +            raise FileExistsError(exists_msg)
    +    raise e
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    + +
    + + + diff --git a/docs/utils/pretty_print.html b/docs/utils/pretty_print.html index bc45a951b..5ff38db14 100644 --- a/docs/utils/pretty_print.html +++ b/docs/utils/pretty_print.html @@ -146,8 +146,7 @@

    Module codeflare_sdk.utils.pretty_print

    ) name = cluster.name dashboard = cluster.dashboard - mincount = str(cluster.min_workers) - maxcount = str(cluster.max_workers) + workers = str(cluster.workers) memory = str(cluster.worker_mem_min) + "~" + str(cluster.worker_mem_max) cpu = str(cluster.worker_cpu) gpu = str(cluster.worker_gpu) @@ -173,10 +172,9 @@

    Module codeflare_sdk.utils.pretty_print

    #'table1' to display the worker counts table1 = Table(box=None) table1.add_row() - table1.add_column("Min", style="cyan", no_wrap=True) - table1.add_column("Max", style="magenta") + table1.add_column("# Workers", style="magenta") table1.add_row() - table1.add_row(mincount, maxcount) + table1.add_row(workers) table1.add_row() #'table2' to display the worker resources @@ -334,8 +332,7 @@

    Functions

    ) name = cluster.name dashboard = cluster.dashboard - mincount = str(cluster.min_workers) - maxcount = str(cluster.max_workers) + workers = str(cluster.workers) memory = str(cluster.worker_mem_min) + "~" + str(cluster.worker_mem_max) cpu = str(cluster.worker_cpu) gpu = str(cluster.worker_gpu) @@ -361,10 +358,9 @@

    Functions

    #'table1' to display the worker counts table1 = Table(box=None) table1.add_row() - table1.add_column("Min", style="cyan", no_wrap=True) - table1.add_column("Max", style="magenta") + table1.add_column("# Workers", style="magenta") table1.add_row() - table1.add_row(mincount, maxcount) + table1.add_row(workers) table1.add_row() #'table2' to display the worker resources