Skip to content

Commit

Permalink
feat(airflow): support driver_ephemeral_configmaps_volumes (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
hussein-awala authored Jul 7, 2024
1 parent 954b579 commit 7cb9f2f
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 0 deletions.
5 changes: 5 additions & 0 deletions spark_on_k8s/airflow/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from airflow.utils.context import Context
from spark_on_k8s.client import ExecutorInstances, PodResources
from spark_on_k8s.utils.app_manager import SparkAppManager
from spark_on_k8s.utils.types import ConfigMap


class _AirflowKubernetesClientManager(KubernetesClientManager):
Expand Down Expand Up @@ -73,6 +74,7 @@ class SparkOnK8SOperator(BaseOperator):
driver_node_selector: Node selector for the driver pod.
executor_node_selector: Node selector for the executor pods.
driver_tolerations: Tolerations for the driver pod.
driver_ephemeral_configmaps_volumes: List of ConfigMaps to mount as ephemeral volumes to the driver.
spark_on_k8s_service_url: URL of the Spark On K8S service. Defaults to None.
kubernetes_conn_id (str, optional): Kubernetes connection ID. Defaults to
"kubernetes_default".
Expand Down Expand Up @@ -141,6 +143,7 @@ def __init__(
executor_annotations: dict[str, str] | None = None,
driver_tolerations: list[k8s.V1Toleration] | None = None,
executor_pod_template_path: str | None = None,
driver_ephemeral_configmaps_volumes: list[ConfigMap] | None = None,
spark_on_k8s_service_url: str | None = None,
kubernetes_conn_id: str = "kubernetes_default",
poll_interval: int = 10,
Expand Down Expand Up @@ -176,6 +179,7 @@ def __init__(
self.executor_annotations = executor_annotations
self.driver_tolerations = driver_tolerations
self.executor_pod_template_path = executor_pod_template_path
self.driver_ephemeral_configmaps_volumes = driver_ephemeral_configmaps_volumes
self.spark_on_k8s_service_url = spark_on_k8s_service_url
self.kubernetes_conn_id = kubernetes_conn_id
self.poll_interval = poll_interval
Expand Down Expand Up @@ -316,6 +320,7 @@ def _submit_new_job(self, context: Context):
driver_annotations=self.driver_annotations,
executor_annotations=self.executor_annotations,
driver_tolerations=self.driver_tolerations,
driver_ephemeral_configmaps_volumes=self.driver_ephemeral_configmaps_volumes,
executor_pod_template_path=self.executor_pod_template_path,
**submit_app_kwargs,
)
Expand Down
21 changes: 21 additions & 0 deletions tests/airflow/test_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ def test_execute(self, mock_submit_app):
driver_annotations={"annotation1": "value1"},
executor_annotations={"annotation2": "value2"},
driver_tolerations=test_tolerations,
driver_ephemeral_configmaps_volumes=[
{
"name": "configmap-volume",
"mount_path": "/etc/config",
"sources": [
{"name": "file1.txt", "text": "config1"},
{"name": "file2.txt", "text_path": "/path/to/some/file.txt"},
],
},
],
executor_pod_template_path="s3a://bucket/executor.yml",
spark_on_k8s_service_url="http://localhost:8000",
)
Expand Down Expand Up @@ -77,6 +87,16 @@ def test_execute(self, mock_submit_app):
driver_annotations={"annotation1": "value1"},
executor_annotations={"annotation2": "value2"},
driver_tolerations=test_tolerations,
driver_ephemeral_configmaps_volumes=[
{
"name": "configmap-volume",
"mount_path": "/etc/config",
"sources": [
{"name": "file1.txt", "text": "config1"},
{"name": "file2.txt", "text_path": "/path/to/some/file.txt"},
],
},
],
executor_pod_template_path="s3a://bucket/executor.yml",
)
assert ti_mock.xcom_push.call_count == 3
Expand Down Expand Up @@ -193,6 +213,7 @@ def test_rendering_templates(self, mock_submit_app):
executor_annotations=None,
driver_tolerations=None,
executor_pod_template_path=None,
driver_ephemeral_configmaps_volumes=None,
)
assert app_id_suffix_kwarg() == "-suffix"

Expand Down

0 comments on commit 7cb9f2f

Please sign in to comment.