|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import os |
| 4 | +import shutil |
| 5 | +import subprocess |
| 6 | +from urllib.parse import urlparse |
| 7 | + |
| 8 | +import requests |
| 9 | +import yaml |
| 10 | + |
| 11 | +from alpacloud.crdvis.models import CustomResourceDefinition |
| 12 | + |
| 13 | + |
| 14 | +class CRDReadError(Exception): |
| 15 | + """Exception raised when there is an error reading a CRD.""" |
| 16 | + |
| 17 | + pass |
| 18 | + |
| 19 | + |
| 20 | +def read_path(path: str) -> CustomResourceDefinition: |
| 21 | + """ |
| 22 | + Read a path-like object to fetch a CRD. |
| 23 | +
|
| 24 | + Raises: |
| 25 | + CRDReadError: If there is an error reading the CRD. |
| 26 | + """ |
| 27 | + if path.startswith("http://") or path.startswith("https://"): |
| 28 | + req = requests.Request("GET", path) |
| 29 | + |
| 30 | + url = urlparse(req.url) |
| 31 | + if url.netloc == "github.com": |
| 32 | + req.params["raw"] = "true" |
| 33 | + |
| 34 | + response = requests.Session().send(req.prepare(), timeout=30) |
| 35 | + |
| 36 | + if not response.ok: |
| 37 | + raise CRDReadError(f"Failed to fetch CRD from {path}: {response.status_code}") |
| 38 | + content = response.text |
| 39 | + elif path.startswith("file://") or os.path.exists(path): |
| 40 | + disk_path = path.rsplit("://", 1)[-1] |
| 41 | + if not os.path.exists(disk_path): |
| 42 | + raise CRDReadError(f"File not found: {disk_path}") |
| 43 | + |
| 44 | + with open(disk_path, "r", encoding="utf-8") as f: |
| 45 | + content = f.read() |
| 46 | + |
| 47 | + elif path.startswith("kubectl://"): |
| 48 | + kubectl_crd = path.rsplit("://", 1)[-1] |
| 49 | + kubectl_exe = shutil.which("kubectl") |
| 50 | + if not kubectl_exe: |
| 51 | + raise CRDReadError("kubectl is not installed.") |
| 52 | + |
| 53 | + try: |
| 54 | + content = subprocess.check_output([kubectl_exe, "get", "-o", "yaml", "crd", kubectl_crd], timeout=30).decode("utf-8") |
| 55 | + except subprocess.SubprocessError as e: |
| 56 | + try: |
| 57 | + crds = subprocess.check_output([kubectl_exe, "get", "crd"], timeout=30) |
| 58 | + if crds: |
| 59 | + error_msg = f"crd is not available in cluster {kubectl_crd}" |
| 60 | + else: |
| 61 | + error_msg = f"Failed to fetch CRDs {kubectl_crd}: {e}" |
| 62 | + |
| 63 | + except subprocess.SubprocessError as e: |
| 64 | + error_msg = f"Failed to fetch CRDs {kubectl_crd}: {e}" |
| 65 | + raise CRDReadError(error_msg) |
| 66 | + else: |
| 67 | + content = path |
| 68 | + |
| 69 | + if not content: |
| 70 | + raise CRDReadError("Empty content") |
| 71 | + doc = yaml.safe_load(content) |
| 72 | + return CustomResourceDefinition.model_validate(doc) |
0 commit comments