Skip to content

Commit

Permalink
feat: support executor pod template path
Browse files Browse the repository at this point in the history
  • Loading branch information
hussein-awala committed Mar 2, 2024
1 parent 843837b commit d873a9b
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ and the CLI. Here is a list of the available environment variables:
| SPARK_ON_K8S_SPARK_EXECUTOR_LABELS | The labels to use for the executor pods | {} |
| SPARK_ON_K8S_SPARK_DRIVER_ANNOTATIONS | The annotations to use for the driver pod | {} |
| SPARK_ON_K8S_SPARK_EXECUTOR_ANNOTATIONS | The annotations to use for the executor pods | {} |
| SPARK_ON_K8S_EXECUTOR_POD_TEMPLATE_PATH | The path to the executor pod template | |


## Examples
Expand Down
3 changes: 3 additions & 0 deletions spark_on_k8s/airflow/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def __init__(
driver_annotations: dict[str, str] | None = None,
executor_annotations: dict[str, str] | None = None,
driver_tolerations: list[k8s.V1Toleration] | None = None,
executor_pod_template_path: str | None = None,
kubernetes_conn_id: str = "kubernetes_default",
poll_interval: int = 10,
deferrable: bool = False,
Expand Down Expand Up @@ -158,6 +159,7 @@ def __init__(
self.driver_annotations = driver_annotations
self.executor_annotations = executor_annotations
self.driver_tolerations = driver_tolerations
self.executor_pod_template_path = executor_pod_template_path
self.kubernetes_conn_id = kubernetes_conn_id
self.poll_interval = poll_interval
self.deferrable = deferrable
Expand Down Expand Up @@ -254,6 +256,7 @@ def execute(self, context):
driver_annotations=self.driver_annotations,
executor_annotations=self.executor_annotations,
driver_tolerations=self.driver_tolerations,
executor_pod_template_path=self.executor_pod_template_path,
)
if self.app_waiter == "no_wait":
return
Expand Down
4 changes: 4 additions & 0 deletions spark_on_k8s/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
executor_memory_overhead_option,
executor_min_instances_option,
executor_node_selector_option,
executor_pod_template_path_option,
force_option,
image_pull_policy_option,
logs_option,
Expand Down Expand Up @@ -129,6 +130,7 @@ def wait(app_id: str, namespace: str):
executor_labels_option,
driver_annotations_option,
executor_annotations_option,
executor_pod_template_path_option,
],
help="Submit a Spark application.",
)
Expand Down Expand Up @@ -163,6 +165,7 @@ def submit(
executor_labels: dict[str, str],
driver_annotations: dict[str, str],
executor_annotations: dict[str, str],
executor_pod_template_path: str,
):
from spark_on_k8s.client import ExecutorInstances, PodResources, SparkOnK8S

Expand Down Expand Up @@ -203,4 +206,5 @@ def submit(
executor_labels=executor_labels,
driver_annotations=driver_annotations,
executor_annotations=executor_annotations,
executor_pod_template_path=executor_pod_template_path,
)
7 changes: 7 additions & 0 deletions spark_on_k8s/cli/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,10 @@ def validate_list_option(ctx, param, value):
show_default=True,
help="Annotations for the executor in key=value format. Can be repeated.",
)
executor_pod_template_path_option = click.Option(
("--executor-pod-template-path", "executor_pod_template_path"),
type=str,
default=Configuration.SPARK_ON_K8S_EXECUTOR_POD_TEMPLATE_PATH,
show_default=True,
help="The path to the executor pod template file.",
)
22 changes: 22 additions & 0 deletions spark_on_k8s/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def submit_app(
driver_labels: dict[str, str] | ArgNotSet = NOTSET,
executor_labels: dict[str, str] | ArgNotSet = NOTSET,
driver_tolerations: list[k8s.V1Toleration] | ArgNotSet = NOTSET,
executor_pod_template_path: str | ArgNotSet = NOTSET,
) -> str:
"""Submit a Spark app to Kubernetes
Expand Down Expand Up @@ -168,6 +169,7 @@ def submit_app(
driver_node_selector: Node selector for the driver
executor_node_selector: Node selector for the executors
driver_tolerations: List of tolerations for the driver
executor_pod_template_path: Path to the executor pod template file
Returns:
Name of the Spark application pod
Expand Down Expand Up @@ -262,6 +264,8 @@ def submit_app(
executor_labels = {}
if driver_tolerations is NOTSET or driver_tolerations is None:
driver_tolerations = []
if executor_pod_template_path is NOTSET or executor_pod_template_path is None:
executor_pod_template_path = Configuration.SPARK_ON_K8S_EXECUTOR_POD_TEMPLATE_PATH

spark_conf = spark_conf or {}
main_class_parameters = app_arguments or []
Expand Down Expand Up @@ -313,6 +317,8 @@ def submit_app(
basic_conf.update(self._executor_labels(labels=executor_labels))
if executor_annotations:
basic_conf.update(self._executor_annotations(annotations=executor_annotations))
if executor_pod_template_path:
basic_conf.update(self._executor_pod_template_path(executor_pod_template_path))
driver_command_args = ["driver", "--master", "k8s://https://kubernetes.default.svc.cluster.local:443"]
if class_name:
driver_command_args.extend(["--class", class_name])
Expand Down Expand Up @@ -607,3 +613,19 @@ def _executor_annotations(
if not annotations:
return {}
return {f"spark.kubernetes.executor.annotation.{key}": value for key, value in annotations.items()}

@staticmethod
def _executor_pod_template_path(
executor_pod_template_path: str | None,
) -> dict[str, str]:
"""Spark configuration to set the executor pod template file
Args:
executor_pod_template_path: Path to the executor pod template file
Returns:
Spark configuration dictionary
"""
if not executor_pod_template_path:
return {}
return {"spark.kubernetes.executor.podTemplateFile": executor_pod_template_path}
1 change: 1 addition & 0 deletions spark_on_k8s/utils/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class Configuration:
SPARK_ON_K8S_SPARK_EXECUTOR_ANNOTATIONS = json.loads(
getenv("SPARK_ON_K8S_SPARK_EXECUTOR_ANNOTATIONS", "{}")
)
SPARK_ON_K8S_EXECUTOR_POD_TEMPLATE_PATH = getenv("SPARK_ON_K8S_EXECUTOR_POD_TEMPLATE_PATH", None)
try:
from kubernetes_asyncio import client as async_k8s

Expand Down
3 changes: 3 additions & 0 deletions tests/airflow/test_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def test_execute(self, mock_submit_app):
driver_annotations={"annotation1": "value1"},
executor_annotations={"annotation2": "value2"},
driver_tolerations=test_tolerations,
executor_pod_template_path="s3a://bucket/executor.yml",
)
spark_app_task.execute(None)
mock_submit_app.assert_called_once_with(
Expand Down Expand Up @@ -69,6 +70,7 @@ def test_execute(self, mock_submit_app):
driver_annotations={"annotation1": "value1"},
executor_annotations={"annotation2": "value2"},
driver_tolerations=test_tolerations,
executor_pod_template_path="s3a://bucket/executor.yml",
)

@mock.patch("spark_on_k8s.client.SparkOnK8S.submit_app")
Expand Down Expand Up @@ -158,4 +160,5 @@ def test_rendering_templates(self, mock_submit_app):
driver_annotations=None,
executor_annotations=None,
driver_tolerations=None,
executor_pod_template_path=None,
)
34 changes: 34 additions & 0 deletions tests/test_spark_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -803,3 +803,37 @@ def test_submit_app_with_tolerations(
effect="NoExecute",
),
]

@mock.patch("spark_on_k8s.k8s.sync_client.KubernetesClientManager.create_client")
@mock.patch("kubernetes.client.api.core_v1_api.CoreV1Api.create_namespaced_pod")
@mock.patch("kubernetes.client.api.core_v1_api.CoreV1Api.create_namespaced_service")
@freeze_time(FAKE_TIME)
def test_submit_app_with_executor_pod_template_path(
self, mock_create_namespaced_service, mock_create_namespaced_pod, mock_create_client
):
"""Test the method submit_app"""

spark_client = SparkOnK8S()
spark_client.submit_app(
image="pyspark-job",
app_path="local:///opt/spark/work-dir/job.py",
namespace="spark",
service_account="spark",
app_name="pyspark-job-example",
app_arguments=["100000"],
app_waiter="no_wait",
image_pull_policy="Never",
ui_reverse_proxy=True,
driver_resources=PodResources(cpu=1, memory=2048, memory_overhead=1024),
executor_instances=ExecutorInstances(min=2, max=5, initial=5),
executor_pod_template_path="s3a://bucket/executor.yml",
)

created_pod = mock_create_namespaced_pod.call_args[1]["body"]
arguments = created_pod.spec.containers[0].args
executor_config = {
conf.split("=")[0]: conf.split("=")[1]
for conf in arguments
if conf.startswith("spark.kubernetes.executor")
}
assert executor_config.get("spark.kubernetes.executor.podTemplateFile") == "s3a://bucket/executor.yml"

0 comments on commit d873a9b

Please sign in to comment.