From d873a9b576725784113aca6d02e2e5c88420f98d Mon Sep 17 00:00:00 2001 From: hussein-awala Date: Sun, 3 Mar 2024 00:59:09 +0100 Subject: [PATCH] feat: support executor pod template path --- README.md | 1 + spark_on_k8s/airflow/operators.py | 3 +++ spark_on_k8s/cli/app.py | 4 ++++ spark_on_k8s/cli/options.py | 7 ++++++ spark_on_k8s/client.py | 22 +++++++++++++++++++ spark_on_k8s/utils/configuration.py | 1 + tests/airflow/test_operators.py | 3 +++ tests/test_spark_client.py | 34 +++++++++++++++++++++++++++++ 8 files changed, 75 insertions(+) diff --git a/README.md b/README.md index edd0013..5bb1652 100644 --- a/README.md +++ b/README.md @@ -186,6 +186,7 @@ and the CLI. Here is a list of the available environment variables: | SPARK_ON_K8S_SPARK_EXECUTOR_LABELS | The labels to use for the executor pods | {} | | SPARK_ON_K8S_SPARK_DRIVER_ANNOTATIONS | The annotations to use for the driver pod | {} | | SPARK_ON_K8S_SPARK_EXECUTOR_ANNOTATIONS | The annotations to use for the executor pods | {} | +| SPARK_ON_K8S_EXECUTOR_POD_TEMPLATE_PATH | The path to the executor pod template | | ## Examples diff --git a/spark_on_k8s/airflow/operators.py b/spark_on_k8s/airflow/operators.py index a8804e4..6274c07 100644 --- a/spark_on_k8s/airflow/operators.py +++ b/spark_on_k8s/airflow/operators.py @@ -126,6 +126,7 @@ def __init__( driver_annotations: dict[str, str] | None = None, executor_annotations: dict[str, str] | None = None, driver_tolerations: list[k8s.V1Toleration] | None = None, + executor_pod_template_path: str | None = None, kubernetes_conn_id: str = "kubernetes_default", poll_interval: int = 10, deferrable: bool = False, @@ -158,6 +159,7 @@ def __init__( self.driver_annotations = driver_annotations self.executor_annotations = executor_annotations self.driver_tolerations = driver_tolerations + self.executor_pod_template_path = executor_pod_template_path self.kubernetes_conn_id = kubernetes_conn_id self.poll_interval = poll_interval self.deferrable = deferrable @@ -254,6 +256,7 @@ def execute(self, context): driver_annotations=self.driver_annotations, executor_annotations=self.executor_annotations, driver_tolerations=self.driver_tolerations, + executor_pod_template_path=self.executor_pod_template_path, ) if self.app_waiter == "no_wait": return diff --git a/spark_on_k8s/cli/app.py b/spark_on_k8s/cli/app.py index e440a9e..1194d2e 100644 --- a/spark_on_k8s/cli/app.py +++ b/spark_on_k8s/cli/app.py @@ -26,6 +26,7 @@ executor_memory_overhead_option, executor_min_instances_option, executor_node_selector_option, + executor_pod_template_path_option, force_option, image_pull_policy_option, logs_option, @@ -129,6 +130,7 @@ def wait(app_id: str, namespace: str): executor_labels_option, driver_annotations_option, executor_annotations_option, + executor_pod_template_path_option, ], help="Submit a Spark application.", ) @@ -163,6 +165,7 @@ def submit( executor_labels: dict[str, str], driver_annotations: dict[str, str], executor_annotations: dict[str, str], + executor_pod_template_path: str, ): from spark_on_k8s.client import ExecutorInstances, PodResources, SparkOnK8S @@ -203,4 +206,5 @@ def submit( executor_labels=executor_labels, driver_annotations=driver_annotations, executor_annotations=executor_annotations, + executor_pod_template_path=executor_pod_template_path, ) diff --git a/spark_on_k8s/cli/options.py b/spark_on_k8s/cli/options.py index 81e1ad7..08c26f0 100644 --- a/spark_on_k8s/cli/options.py +++ b/spark_on_k8s/cli/options.py @@ -254,3 +254,10 @@ def validate_list_option(ctx, param, value): show_default=True, help="Annotations for the executor in key=value format. Can be repeated.", ) +executor_pod_template_path_option = click.Option( + ("--executor-pod-template-path", "executor_pod_template_path"), + type=str, + default=Configuration.SPARK_ON_K8S_EXECUTOR_POD_TEMPLATE_PATH, + show_default=True, + help="The path to the executor pod template file.", +) diff --git a/spark_on_k8s/client.py b/spark_on_k8s/client.py index 42ea5c6..ce47ff4 100644 --- a/spark_on_k8s/client.py +++ b/spark_on_k8s/client.py @@ -130,6 +130,7 @@ def submit_app( driver_labels: dict[str, str] | ArgNotSet = NOTSET, executor_labels: dict[str, str] | ArgNotSet = NOTSET, driver_tolerations: list[k8s.V1Toleration] | ArgNotSet = NOTSET, + executor_pod_template_path: str | ArgNotSet = NOTSET, ) -> str: """Submit a Spark app to Kubernetes @@ -168,6 +169,7 @@ def submit_app( driver_node_selector: Node selector for the driver executor_node_selector: Node selector for the executors driver_tolerations: List of tolerations for the driver + executor_pod_template_path: Path to the executor pod template file Returns: Name of the Spark application pod @@ -262,6 +264,8 @@ def submit_app( executor_labels = {} if driver_tolerations is NOTSET or driver_tolerations is None: driver_tolerations = [] + if executor_pod_template_path is NOTSET or executor_pod_template_path is None: + executor_pod_template_path = Configuration.SPARK_ON_K8S_EXECUTOR_POD_TEMPLATE_PATH spark_conf = spark_conf or {} main_class_parameters = app_arguments or [] @@ -313,6 +317,8 @@ def submit_app( basic_conf.update(self._executor_labels(labels=executor_labels)) if executor_annotations: basic_conf.update(self._executor_annotations(annotations=executor_annotations)) + if executor_pod_template_path: + basic_conf.update(self._executor_pod_template_path(executor_pod_template_path)) driver_command_args = ["driver", "--master", "k8s://https://kubernetes.default.svc.cluster.local:443"] if class_name: driver_command_args.extend(["--class", class_name]) @@ -607,3 +613,19 @@ def _executor_annotations( if not annotations: return {} return {f"spark.kubernetes.executor.annotation.{key}": value for key, value in annotations.items()} + + @staticmethod + def _executor_pod_template_path( + executor_pod_template_path: str | None, + ) -> dict[str, str]: + """Spark configuration to set the executor pod template file + + Args: + executor_pod_template_path: Path to the executor pod template file + + Returns: + Spark configuration dictionary + """ + if not executor_pod_template_path: + return {} + return {"spark.kubernetes.executor.podTemplateFile": executor_pod_template_path} diff --git a/spark_on_k8s/utils/configuration.py b/spark_on_k8s/utils/configuration.py index fd09d83..ee42f0d 100644 --- a/spark_on_k8s/utils/configuration.py +++ b/spark_on_k8s/utils/configuration.py @@ -70,6 +70,7 @@ class Configuration: SPARK_ON_K8S_SPARK_EXECUTOR_ANNOTATIONS = json.loads( getenv("SPARK_ON_K8S_SPARK_EXECUTOR_ANNOTATIONS", "{}") ) + SPARK_ON_K8S_EXECUTOR_POD_TEMPLATE_PATH = getenv("SPARK_ON_K8S_EXECUTOR_POD_TEMPLATE_PATH", None) try: from kubernetes_asyncio import client as async_k8s diff --git a/tests/airflow/test_operators.py b/tests/airflow/test_operators.py index e9b2fb9..7037446 100644 --- a/tests/airflow/test_operators.py +++ b/tests/airflow/test_operators.py @@ -41,6 +41,7 @@ def test_execute(self, mock_submit_app): driver_annotations={"annotation1": "value1"}, executor_annotations={"annotation2": "value2"}, driver_tolerations=test_tolerations, + executor_pod_template_path="s3a://bucket/executor.yml", ) spark_app_task.execute(None) mock_submit_app.assert_called_once_with( @@ -69,6 +70,7 @@ def test_execute(self, mock_submit_app): driver_annotations={"annotation1": "value1"}, executor_annotations={"annotation2": "value2"}, driver_tolerations=test_tolerations, + executor_pod_template_path="s3a://bucket/executor.yml", ) @mock.patch("spark_on_k8s.client.SparkOnK8S.submit_app") @@ -158,4 +160,5 @@ def test_rendering_templates(self, mock_submit_app): driver_annotations=None, executor_annotations=None, driver_tolerations=None, + executor_pod_template_path=None, ) diff --git a/tests/test_spark_client.py b/tests/test_spark_client.py index 05b24a8..3506ebe 100644 --- a/tests/test_spark_client.py +++ b/tests/test_spark_client.py @@ -803,3 +803,37 @@ def test_submit_app_with_tolerations( effect="NoExecute", ), ] + + @mock.patch("spark_on_k8s.k8s.sync_client.KubernetesClientManager.create_client") + @mock.patch("kubernetes.client.api.core_v1_api.CoreV1Api.create_namespaced_pod") + @mock.patch("kubernetes.client.api.core_v1_api.CoreV1Api.create_namespaced_service") + @freeze_time(FAKE_TIME) + def test_submit_app_with_executor_pod_template_path( + self, mock_create_namespaced_service, mock_create_namespaced_pod, mock_create_client + ): + """Test the method submit_app""" + + spark_client = SparkOnK8S() + spark_client.submit_app( + image="pyspark-job", + app_path="local:///opt/spark/work-dir/job.py", + namespace="spark", + service_account="spark", + app_name="pyspark-job-example", + app_arguments=["100000"], + app_waiter="no_wait", + image_pull_policy="Never", + ui_reverse_proxy=True, + driver_resources=PodResources(cpu=1, memory=2048, memory_overhead=1024), + executor_instances=ExecutorInstances(min=2, max=5, initial=5), + executor_pod_template_path="s3a://bucket/executor.yml", + ) + + created_pod = mock_create_namespaced_pod.call_args[1]["body"] + arguments = created_pod.spec.containers[0].args + executor_config = { + conf.split("=")[0]: conf.split("=")[1] + for conf in arguments + if conf.startswith("spark.kubernetes.executor") + } + assert executor_config.get("spark.kubernetes.executor.podTemplateFile") == "s3a://bucket/executor.yml"