Skip to content

Commit eae849f

Browse files
committed
WIP - Add Breeze k8s development cmds
1 parent 8dd7713 commit eae849f

File tree

4 files changed

+149
-0
lines changed

4 files changed

+149
-0
lines changed

dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,12 @@
4444
)
4545
from airflow_breeze.commands.production_image_commands import run_build_production_image
4646
from airflow_breeze.global_constants import (
47+
ALLOWED_AIRFLOW_KUBERNETES_COMPONENTS,
4748
ALLOWED_EXECUTORS,
4849
ALLOWED_KUBERNETES_VERSIONS,
4950
CELERY_EXECUTOR,
5051
KUBERNETES_EXECUTOR,
52+
LOCAL_EXECUTOR,
5153
)
5254
from airflow_breeze.params.build_prod_params import BuildProdParams
5355
from airflow_breeze.utils.ci_group import ci_group
@@ -80,6 +82,7 @@
8082
check_async_run_results,
8183
run_with_pool,
8284
)
85+
from airflow_breeze.utils.path_utils import AIRFLOW_CORE_SOURCES_PATH
8386
from airflow_breeze.utils.recording import generating_command_images
8487
from airflow_breeze.utils.run_utils import RunCommandResult, check_if_image_exists, run_command
8588

@@ -143,6 +146,13 @@ def kubernetes_group():
143146
is_flag=True,
144147
envvar="FORCE_VENV_SETUP",
145148
)
149+
option_kubernetes_component = click.option(
150+
"--component",
151+
help="Specify specific Airflow component in Kubernetes",
152+
type=CacheableChoice(ALLOWED_AIRFLOW_KUBERNETES_COMPONENTS),
153+
show_default=True,
154+
default=CacheableDefault(ALLOWED_AIRFLOW_KUBERNETES_COMPONENTS[0]),
155+
)
146156
option_kubernetes_version = click.option(
147157
"--kubernetes-version",
148158
help="Kubernetes version used to create the KinD cluster of.",
@@ -450,6 +460,124 @@ def delete_cluster(python: str, kubernetes_version: str, all: bool):
450460
_delete_cluster(python=python, kubernetes_version=kubernetes_version, output=None)
451461

452462

463+
def _restart(component: str, python: str, kubernetes_version: str, executor: str):
464+
# Restart deployments that use the code
465+
components_to_restart = ["scheduler", "api-server", "trigger", "worker"]
466+
if component != "all":
467+
components_to_restart = [component]
468+
469+
for component in components_to_restart:
470+
get_console().print(f"[info]Restarting {component} component...")
471+
result = run_command_with_k8s_env(
472+
[
473+
"kubectl",
474+
"rollout",
475+
"restart",
476+
"statefulset" if executor == LOCAL_EXECUTOR and component == "scheduler" else "deployment",
477+
f"airflow-{component}",
478+
"-n",
479+
HELM_AIRFLOW_NAMESPACE,
480+
],
481+
python=python,
482+
kubernetes_version=kubernetes_version,
483+
check=False,
484+
)
485+
486+
if result.returncode != 0:
487+
get_console().print(f"[warning]Failed to restart {component} component")
488+
489+
490+
@kubernetes_group.command(
491+
name="reload-code", help="Copy airflow-core code to airflow_code PVC and restart all deployment"
492+
)
493+
@option_python
494+
@option_kubernetes_version
495+
@option_kubernetes_component
496+
@option_executor
497+
def reload_code(python: str, kubernetes_version: str, component: str, executor: str):
498+
result = sync_virtualenv(force_venv_setup=False)
499+
if result.returncode != 0:
500+
sys.exit(result.returncode)
501+
make_sure_kubernetes_tools_are_installed()
502+
503+
# Copy code to PVC via one of the pods that mounts it
504+
get_console().print("[info]Copying local airflow-core code to airflow_code PVC...")
505+
506+
# First find a pod that has the PVC mounted - typically scheduler is a good choice
507+
get_pod_cmd = [
508+
"kubectl",
509+
"get",
510+
"pods",
511+
"-n",
512+
HELM_AIRFLOW_NAMESPACE,
513+
"-l",
514+
"component=scheduler",
515+
"-o",
516+
"jsonpath='{.items[0].metadata.name}'",
517+
]
518+
result = run_command_with_k8s_env(
519+
get_pod_cmd,
520+
python=python,
521+
kubernetes_version=kubernetes_version,
522+
capture_output=True,
523+
check=False,
524+
)
525+
526+
if result.returncode != 0:
527+
get_console().print("[error]Failed to find scheduler pod")
528+
sys.exit(result.returncode)
529+
530+
pod_name = result.stdout.decode().strip().replace("'", "")
531+
532+
# Copy the code to the PVC through the pod
533+
result = run_command_with_k8s_env(
534+
[
535+
"kubectl",
536+
"cp",
537+
str(AIRFLOW_CORE_SOURCES_PATH / "airflow"),
538+
f"{HELM_AIRFLOW_NAMESPACE}/{pod_name}:/opt/airflow",
539+
"-c",
540+
"airflow",
541+
],
542+
python=python,
543+
kubernetes_version=kubernetes_version,
544+
check=False,
545+
)
546+
547+
if result.returncode != 0:
548+
get_console().print("[error]Failed to copy code to PVC")
549+
sys.exit(result.returncode)
550+
551+
_restart(
552+
component=component,
553+
python=python,
554+
kubernetes_version=kubernetes_version,
555+
executor=executor,
556+
)
557+
558+
get_console().print("[success]Code reloaded successfully!")
559+
return
560+
561+
562+
@kubernetes_group.command(name="restart", help="Restart all ( or specific ) airflow components")
563+
@option_python
564+
@option_kubernetes_version
565+
@option_kubernetes_component
566+
@option_executor
567+
def restart(python: str, kubernetes_version: str, component: str, executor: str):
568+
result = sync_virtualenv(force_venv_setup=False)
569+
if result.returncode != 0:
570+
sys.exit(result.returncode)
571+
make_sure_kubernetes_tools_are_installed()
572+
573+
_restart(
574+
component=component,
575+
python=python,
576+
kubernetes_version=kubernetes_version,
577+
executor=executor,
578+
)
579+
580+
453581
def _get_python_kubernetes_version_from_name(cluster_name: str) -> tuple[str | None, str | None]:
454582
matcher = re.compile(r"airflow-python-(\d+\.\d+)-(v\d+.\d+.\d+)")
455583
cluster_match = matcher.search(cluster_name)
@@ -1017,6 +1145,8 @@ def _deploy_helm_chart(
10171145
"--namespace",
10181146
HELM_AIRFLOW_NAMESPACE,
10191147
"--set",
1148+
"devMode.enabled=true", # allow mount airflow code from host machine
1149+
"--set",
10201150
f"defaultAirflowRepository={params.airflow_image_kubernetes}",
10211151
"--set",
10221152
"defaultAirflowTag=latest",

dev/breeze/src/airflow_breeze/commands/kubernetes_commands_config.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@
2828
"delete-cluster",
2929
],
3030
}
31+
KUBERNETES_DEVELOPMENT_COMMANDS: dict[str, str | list[str]] = {
32+
"name": "K8S development commands",
33+
"commands": ["reload-code", "restart"],
34+
}
3135
KUBERNETES_INSPECTION_COMMANDS: dict[str, str | list[str]] = {
3236
"name": "K8S inspection commands",
3337
"commands": ["status", "logs"],
@@ -183,6 +187,18 @@
183187
"options": ["--python", "--kubernetes-version", "--all"],
184188
},
185189
],
190+
"breeze k8s reload-code": [
191+
{
192+
"name": "K8S reload code flags",
193+
"options": ["--python", "--kubernetes-version", "--component", "--executor"],
194+
}
195+
],
196+
"breeze k8s restart": [
197+
{
198+
"name": "K8S restart flags",
199+
"options": ["--python", "--kubernetes-version", "--component", "--executor"],
200+
}
201+
],
186202
"breeze k8s status": [
187203
{
188204
"name": "K8S cluster status flags",

dev/breeze/src/airflow_breeze/configure_rich_click.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from airflow_breeze.commands.developer_commands_config import DEVELOPER_COMMANDS, DEVELOPER_PARAMETERS
3939
from airflow_breeze.commands.kubernetes_commands_config import (
4040
KUBERNETES_CLUSTER_COMMANDS,
41+
KUBERNETES_DEVELOPMENT_COMMANDS,
4142
KUBERNETES_INSPECTION_COMMANDS,
4243
KUBERNETES_PARAMETERS,
4344
KUBERNETES_TESTING_COMMANDS,
@@ -100,6 +101,7 @@
100101
"breeze testing": TESTING_COMMANDS,
101102
"breeze k8s": [
102103
KUBERNETES_CLUSTER_COMMANDS,
104+
KUBERNETES_DEVELOPMENT_COMMANDS,
103105
KUBERNETES_INSPECTION_COMMANDS,
104106
KUBERNETES_TESTING_COMMANDS,
105107
],

dev/breeze/src/airflow_breeze/global_constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@
152152
CONSTRAINTS_NO_PROVIDERS = "constraints-no-providers"
153153

154154
ALLOWED_KIND_OPERATIONS = ["start", "stop", "restart", "status", "deploy", "test", "shell", "k9s"]
155+
ALLOWED_AIRFLOW_KUBERNETES_COMPONENTS = ["all", "scheduler", "api-server", "trigger", "workers"]
155156
ALLOWED_CONSTRAINTS_MODES_CI = [CONSTRAINTS_SOURCE_PROVIDERS, CONSTRAINTS, CONSTRAINTS_NO_PROVIDERS]
156157
ALLOWED_CONSTRAINTS_MODES_PROD = [CONSTRAINTS, CONSTRAINTS_NO_PROVIDERS, CONSTRAINTS_SOURCE_PROVIDERS]
157158

0 commit comments

Comments
 (0)