|
44 | 44 | ) |
45 | 45 | from airflow_breeze.commands.production_image_commands import run_build_production_image |
46 | 46 | from airflow_breeze.global_constants import ( |
| 47 | + ALLOWED_AIRFLOW_KUBERNETES_COMPONENTS, |
47 | 48 | ALLOWED_EXECUTORS, |
48 | 49 | ALLOWED_KUBERNETES_VERSIONS, |
49 | 50 | CELERY_EXECUTOR, |
50 | 51 | KUBERNETES_EXECUTOR, |
| 52 | + LOCAL_EXECUTOR, |
51 | 53 | ) |
52 | 54 | from airflow_breeze.params.build_prod_params import BuildProdParams |
53 | 55 | from airflow_breeze.utils.ci_group import ci_group |
|
80 | 82 | check_async_run_results, |
81 | 83 | run_with_pool, |
82 | 84 | ) |
| 85 | +from airflow_breeze.utils.path_utils import AIRFLOW_CORE_SOURCES_PATH |
83 | 86 | from airflow_breeze.utils.recording import generating_command_images |
84 | 87 | from airflow_breeze.utils.run_utils import RunCommandResult, check_if_image_exists, run_command |
85 | 88 |
|
@@ -143,6 +146,13 @@ def kubernetes_group(): |
143 | 146 | is_flag=True, |
144 | 147 | envvar="FORCE_VENV_SETUP", |
145 | 148 | ) |
| 149 | +option_kubernetes_component = click.option( |
| 150 | + "--component", |
| 151 | + help="Specify specific Airflow component in Kubernetes", |
| 152 | + type=CacheableChoice(ALLOWED_AIRFLOW_KUBERNETES_COMPONENTS), |
| 153 | + show_default=True, |
| 154 | + default=CacheableDefault(ALLOWED_AIRFLOW_KUBERNETES_COMPONENTS[0]), |
| 155 | +) |
146 | 156 | option_kubernetes_version = click.option( |
147 | 157 | "--kubernetes-version", |
148 | 158 | help="Kubernetes version used to create the KinD cluster of.", |
@@ -450,6 +460,124 @@ def delete_cluster(python: str, kubernetes_version: str, all: bool): |
450 | 460 | _delete_cluster(python=python, kubernetes_version=kubernetes_version, output=None) |
451 | 461 |
|
452 | 462 |
|
| 463 | +def _restart(component: str, python: str, kubernetes_version: str, executor: str): |
| 464 | + # Restart deployments that use the code |
| 465 | + components_to_restart = ["scheduler", "api-server", "trigger", "worker"] |
| 466 | + if component != "all": |
| 467 | + components_to_restart = [component] |
| 468 | + |
| 469 | + for component in components_to_restart: |
| 470 | + get_console().print(f"[info]Restarting {component} component...") |
| 471 | + result = run_command_with_k8s_env( |
| 472 | + [ |
| 473 | + "kubectl", |
| 474 | + "rollout", |
| 475 | + "restart", |
| 476 | + "statefulset" if executor == LOCAL_EXECUTOR and component == "scheduler" else "deployment", |
| 477 | + f"airflow-{component}", |
| 478 | + "-n", |
| 479 | + HELM_AIRFLOW_NAMESPACE, |
| 480 | + ], |
| 481 | + python=python, |
| 482 | + kubernetes_version=kubernetes_version, |
| 483 | + check=False, |
| 484 | + ) |
| 485 | + |
| 486 | + if result.returncode != 0: |
| 487 | + get_console().print(f"[warning]Failed to restart {component} component") |
| 488 | + |
| 489 | + |
| 490 | +@kubernetes_group.command( |
| 491 | + name="reload-code", help="Copy airflow-core code to airflow_code PVC and restart all deployment" |
| 492 | +) |
| 493 | +@option_python |
| 494 | +@option_kubernetes_version |
| 495 | +@option_kubernetes_component |
| 496 | +@option_executor |
| 497 | +def reload_code(python: str, kubernetes_version: str, component: str, executor: str): |
| 498 | + result = sync_virtualenv(force_venv_setup=False) |
| 499 | + if result.returncode != 0: |
| 500 | + sys.exit(result.returncode) |
| 501 | + make_sure_kubernetes_tools_are_installed() |
| 502 | + |
| 503 | + # Copy code to PVC via one of the pods that mounts it |
| 504 | + get_console().print("[info]Copying local airflow-core code to airflow_code PVC...") |
| 505 | + |
| 506 | + # First find a pod that has the PVC mounted - typically scheduler is a good choice |
| 507 | + get_pod_cmd = [ |
| 508 | + "kubectl", |
| 509 | + "get", |
| 510 | + "pods", |
| 511 | + "-n", |
| 512 | + HELM_AIRFLOW_NAMESPACE, |
| 513 | + "-l", |
| 514 | + "component=scheduler", |
| 515 | + "-o", |
| 516 | + "jsonpath='{.items[0].metadata.name}'", |
| 517 | + ] |
| 518 | + result = run_command_with_k8s_env( |
| 519 | + get_pod_cmd, |
| 520 | + python=python, |
| 521 | + kubernetes_version=kubernetes_version, |
| 522 | + capture_output=True, |
| 523 | + check=False, |
| 524 | + ) |
| 525 | + |
| 526 | + if result.returncode != 0: |
| 527 | + get_console().print("[error]Failed to find scheduler pod") |
| 528 | + sys.exit(result.returncode) |
| 529 | + |
| 530 | + pod_name = result.stdout.decode().strip().replace("'", "") |
| 531 | + |
| 532 | + # Copy the code to the PVC through the pod |
| 533 | + result = run_command_with_k8s_env( |
| 534 | + [ |
| 535 | + "kubectl", |
| 536 | + "cp", |
| 537 | + str(AIRFLOW_CORE_SOURCES_PATH / "airflow"), |
| 538 | + f"{HELM_AIRFLOW_NAMESPACE}/{pod_name}:/opt/airflow", |
| 539 | + "-c", |
| 540 | + "airflow", |
| 541 | + ], |
| 542 | + python=python, |
| 543 | + kubernetes_version=kubernetes_version, |
| 544 | + check=False, |
| 545 | + ) |
| 546 | + |
| 547 | + if result.returncode != 0: |
| 548 | + get_console().print("[error]Failed to copy code to PVC") |
| 549 | + sys.exit(result.returncode) |
| 550 | + |
| 551 | + _restart( |
| 552 | + component=component, |
| 553 | + python=python, |
| 554 | + kubernetes_version=kubernetes_version, |
| 555 | + executor=executor, |
| 556 | + ) |
| 557 | + |
| 558 | + get_console().print("[success]Code reloaded successfully!") |
| 559 | + return |
| 560 | + |
| 561 | + |
| 562 | +@kubernetes_group.command(name="restart", help="Restart all ( or specific ) airflow components") |
| 563 | +@option_python |
| 564 | +@option_kubernetes_version |
| 565 | +@option_kubernetes_component |
| 566 | +@option_executor |
| 567 | +def restart(python: str, kubernetes_version: str, component: str, executor: str): |
| 568 | + result = sync_virtualenv(force_venv_setup=False) |
| 569 | + if result.returncode != 0: |
| 570 | + sys.exit(result.returncode) |
| 571 | + make_sure_kubernetes_tools_are_installed() |
| 572 | + |
| 573 | + _restart( |
| 574 | + component=component, |
| 575 | + python=python, |
| 576 | + kubernetes_version=kubernetes_version, |
| 577 | + executor=executor, |
| 578 | + ) |
| 579 | + |
| 580 | + |
453 | 581 | def _get_python_kubernetes_version_from_name(cluster_name: str) -> tuple[str | None, str | None]: |
454 | 582 | matcher = re.compile(r"airflow-python-(\d+\.\d+)-(v\d+.\d+.\d+)") |
455 | 583 | cluster_match = matcher.search(cluster_name) |
@@ -1017,6 +1145,8 @@ def _deploy_helm_chart( |
1017 | 1145 | "--namespace", |
1018 | 1146 | HELM_AIRFLOW_NAMESPACE, |
1019 | 1147 | "--set", |
| 1148 | + "devMode.enabled=true", # allow mount airflow code from host machine |
| 1149 | + "--set", |
1020 | 1150 | f"defaultAirflowRepository={params.airflow_image_kubernetes}", |
1021 | 1151 | "--set", |
1022 | 1152 | "defaultAirflowTag=latest", |
|
0 commit comments