Skip to content

Commit

Permalink
feat: support dirver and executor labels and annotations (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
hussein-awala authored Mar 1, 2024
1 parent cc8ec76 commit e797a58
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 1 deletion.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ and the CLI. Here is a list of the available environment variables:
| SPARK_ON_K8S_API_LIMIT_CONCURRENCY | The limit concurrency to use for the API | 1000 |
| SPARK_ON_K8S_SPARK_DRIVER_NODE_SELECTOR | The node selector to use for the driver pod | {} |
| SPARK_ON_K8S_SPARK_EXECUTOR_NODE_SELECTOR | The node selector to use for the executor pods | {} |
| SPARK_ON_K8S_SPARK_DRIVER_LABELS | The labels to use for the driver pod | {} |
| SPARK_ON_K8S_SPARK_EXECUTOR_LABELS | The labels to use for the executor pods | {} |
| SPARK_ON_K8S_SPARK_DRIVER_ANNOTATIONS | The annotations to use for the driver pod | {} |
| SPARK_ON_K8S_SPARK_EXECUTOR_ANNOTATIONS | The annotations to use for the executor pods | {} |


## Examples
Expand Down
12 changes: 12 additions & 0 deletions spark_on_k8s/airflow/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ def __init__(
executor_volume_mounts: list[k8s.V1VolumeMount] | None = None,
driver_node_selector: dict[str, str] | None = None,
executor_node_selector: dict[str, str] | None = None,
driver_labels: dict[str, str] | None = None,
executor_labels: dict[str, str] | None = None,
driver_annotations: dict[str, str] | None = None,
executor_annotations: dict[str, str] | None = None,
kubernetes_conn_id: str = "kubernetes_default",
poll_interval: int = 10,
deferrable: bool = False,
Expand Down Expand Up @@ -147,6 +151,10 @@ def __init__(
self.executor_volume_mounts = executor_volume_mounts
self.driver_node_selector = driver_node_selector
self.executor_node_selector = executor_node_selector
self.driver_labels = driver_labels
self.executor_labels = executor_labels
self.driver_annotations = driver_annotations
self.executor_annotations = executor_annotations
self.kubernetes_conn_id = kubernetes_conn_id
self.poll_interval = poll_interval
self.deferrable = deferrable
Expand Down Expand Up @@ -238,6 +246,10 @@ def execute(self, context):
executor_volume_mounts=self.executor_volume_mounts,
driver_node_selector=self.driver_node_selector,
executor_node_selector=self.executor_node_selector,
driver_labels=self.driver_labels,
executor_labels=self.executor_labels,
driver_annotations=self.driver_annotations,
executor_annotations=self.executor_annotations,
)
if self.app_waiter == "no_wait":
return
Expand Down
16 changes: 16 additions & 0 deletions spark_on_k8s/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@
app_path_option,
class_name_option,
docker_image_option,
driver_annotations_option,
driver_cpu_option,
driver_env_vars_from_secrets_option,
driver_labels_option,
driver_memory_option,
driver_memory_overhead_option,
driver_node_selector_option,
executor_annotations_option,
executor_cpu_option,
executor_initial_instances_option,
executor_labels_option,
executor_max_instances_option,
executor_memory_option,
executor_memory_overhead_option,
Expand Down Expand Up @@ -121,6 +125,10 @@ def wait(app_id: str, namespace: str):
driver_env_vars_from_secrets_option,
driver_node_selector_option,
executor_node_selector_option,
driver_labels_option,
executor_labels_option,
driver_annotations_option,
executor_annotations_option,
],
help="Submit a Spark application.",
)
Expand Down Expand Up @@ -151,6 +159,10 @@ def submit(
driver_env_vars_from_secrets: list[str],
driver_node_selector: dict[str, str],
executor_node_selector: dict[str, str],
driver_labels: dict[str, str],
executor_labels: dict[str, str],
driver_annotations: dict[str, str],
executor_annotations: dict[str, str],
):
from spark_on_k8s.client import ExecutorInstances, PodResources, SparkOnK8S

Expand Down Expand Up @@ -187,4 +199,8 @@ def submit(
driver_env_vars_from_secrets=driver_env_vars_from_secrets,
driver_node_selector=driver_node_selector,
executor_node_selector=executor_node_selector,
driver_labels=driver_labels,
executor_labels=executor_labels,
driver_annotations=driver_annotations,
executor_annotations=executor_annotations,
)
36 changes: 36 additions & 0 deletions spark_on_k8s/cli/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,39 @@ def validate_list_option(ctx, param, value):
show_default=True,
help="Node selector for the executor in key=value format. Can be repeated.",
)
driver_labels_option = click.Option(
("--driver-label", "driver_labels"),
type=str,
multiple=True,
callback=validate_dictionary_option,
default=Configuration.SPARK_ON_K8S_SPARK_DRIVER_LABELS,
show_default=True,
help="Labels for the driver in key=value format. Can be repeated.",
)
executor_labels_option = click.Option(
("--executor-label", "executor_labels"),
type=str,
multiple=True,
callback=validate_dictionary_option,
default=Configuration.SPARK_ON_K8S_SPARK_EXECUTOR_LABELS,
show_default=True,
help="Labels for the executor in key=value format. Can be repeated.",
)
driver_annotations_option = click.Option(
("--driver-annotation", "driver_annotations"),
type=str,
multiple=True,
callback=validate_dictionary_option,
default=Configuration.SPARK_ON_K8S_SPARK_DRIVER_ANNOTATIONS,
show_default=True,
help="Annotations for the driver in key=value format. Can be repeated.",
)
executor_annotations_option = click.Option(
("--executor-annotation", "executor_annotations"),
type=str,
multiple=True,
callback=validate_dictionary_option,
default=Configuration.SPARK_ON_K8S_SPARK_EXECUTOR_ANNOTATIONS,
show_default=True,
help="Annotations for the executor in key=value format. Can be repeated.",
)
51 changes: 50 additions & 1 deletion spark_on_k8s/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ def submit_app(
executor_volume_mounts: list[k8s.V1VolumeMount] | ArgNotSet = NOTSET,
driver_node_selector: dict[str, str] | ArgNotSet = NOTSET,
executor_node_selector: dict[str, str] | ArgNotSet = NOTSET,
driver_annotations: dict[str, str] | ArgNotSet = NOTSET,
executor_annotations: dict[str, str] | ArgNotSet = NOTSET,
driver_labels: dict[str, str] | ArgNotSet = NOTSET,
executor_labels: dict[str, str] | ArgNotSet = NOTSET,
) -> str:
"""Submit a Spark app to Kubernetes
Expand Down Expand Up @@ -246,6 +250,14 @@ def submit_app(
driver_node_selector = {}
if executor_node_selector is NOTSET or executor_node_selector is None:
executor_node_selector = {}
if driver_annotations is NOTSET or driver_annotations is None:
driver_annotations = {}
if executor_annotations is NOTSET or executor_annotations is None:
executor_annotations = {}
if driver_labels is NOTSET or driver_labels is None:
driver_labels = {}
if executor_labels is NOTSET or executor_labels is None:
executor_labels = {}

spark_conf = spark_conf or {}
main_class_parameters = app_arguments or []
Expand Down Expand Up @@ -293,6 +305,10 @@ def submit_app(
)
if executor_node_selector:
basic_conf.update(self._executor_node_selector(node_selector=executor_node_selector))
if executor_labels:
basic_conf.update(self._executor_labels(labels=executor_labels))
if executor_annotations:
basic_conf.update(self._executor_annotations(annotations=executor_annotations))
driver_command_args = ["driver", "--master", "k8s://https://kubernetes.default.svc.cluster.local:443"]
if class_name:
driver_command_args.extend(["--class", class_name])
Expand All @@ -306,7 +322,8 @@ def submit_app(
image_pull_policy=image_pull_policy,
namespace=namespace,
args=driver_command_args,
extra_labels=extra_labels,
extra_labels={**extra_labels, **driver_labels},
annotations=driver_annotations,
pod_resources={
"requests": {
"cpu": f"{driver_resources.cpu}",
Expand Down Expand Up @@ -553,3 +570,35 @@ def _executor_node_selector(
return {
f"spark.kubernetes.executor.node.selector.{key}": value for key, value in node_selector.items()
}

@staticmethod
def _executor_labels(
labels: dict[str, str] | None,
) -> dict[str, str]:
"""Spark configuration to set labels for the executors
Args:
labels: Labels for the executors
Returns:
Spark configuration dictionary
"""
if not labels:
return {}
return {f"spark.kubernetes.executor.label.{key}": value for key, value in labels.items()}

@staticmethod
def _executor_annotations(
annotations: dict[str, str] | None,
) -> dict[str, str]:
"""Spark configuration to set annotations for the executors
Args:
annotations: Annotations for the executors
Returns:
Spark configuration dictionary
"""
if not annotations:
return {}
return {f"spark.kubernetes.executor.annotation.{key}": value for key, value in annotations.items()}
3 changes: 3 additions & 0 deletions spark_on_k8s/utils/app_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ def create_spark_pod_spec(
args: list[str] | None = None,
image_pull_policy: Literal["Always", "Never", "IfNotPresent"] = "IfNotPresent",
extra_labels: dict[str, str] | None = None,
annotations: dict[str, str] | None = None,
env_from_secrets: list[str] | None = None,
volumes: list[k8s.V1Volume] | None = None,
volume_mounts: list[k8s.V1VolumeMount] | None = None,
Expand All @@ -339,6 +340,7 @@ def create_spark_pod_spec(
args: List of arguments to pass to the container
image_pull_policy: Image pull policy for the driver and executors, defaults to "IfNotPresent"
extra_labels: Dictionary of extra labels to add to the pod template
annotations: Dictionary of annotations to add to the pod template
env_from_secrets: List of secrets to load environment variables from
volumes: List of volumes to mount in the pod
volume_mounts: List of volume mounts to mount in the container
Expand All @@ -355,6 +357,7 @@ def create_spark_pod_spec(
app_id=app_id,
extra_labels=extra_labels,
),
annotations=annotations,
)
pod_spec = k8s.V1PodSpec(
service_account_name=service_account,
Expand Down
6 changes: 6 additions & 0 deletions spark_on_k8s/utils/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ class Configuration:
SPARK_ON_K8S_SPARK_EXECUTOR_NODE_SELECTOR = json.loads(
getenv("SPARK_ON_K8S_SPARK_EXECUTOR_NODE_SELECTOR", "{}")
)
SPARK_ON_K8S_SPARK_DRIVER_LABELS = json.loads(getenv("SPARK_ON_K8S_SPARK_DRIVER_LABELS", "{}"))
SPARK_ON_K8S_SPARK_EXECUTOR_LABELS = json.loads(getenv("SPARK_ON_K8S_SPARK_EXECUTOR_LABELS", "{}"))
SPARK_ON_K8S_SPARK_DRIVER_ANNOTATIONS = json.loads(getenv("SPARK_ON_K8S_SPARK_DRIVER_ANNOTATIONS", "{}"))
SPARK_ON_K8S_SPARK_EXECUTOR_ANNOTATIONS = json.loads(
getenv("SPARK_ON_K8S_SPARK_EXECUTOR_ANNOTATIONS", "{}")
)
try:
from kubernetes_asyncio import client as async_k8s

Expand Down
12 changes: 12 additions & 0 deletions tests/airflow/test_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ def test_execute(self, mock_submit_app):
ui_reverse_proxy=True,
driver_node_selector={"node-type": "driver"},
executor_node_selector={"node-type": "executor"},
driver_labels={"label1": "value1"},
executor_labels={"label2": "value2"},
driver_annotations={"annotation1": "value1"},
executor_annotations={"annotation2": "value2"},
)
spark_app_task.execute(None)
mock_submit_app.assert_called_once_with(
Expand All @@ -53,6 +57,10 @@ def test_execute(self, mock_submit_app):
executor_volume_mounts=None,
driver_node_selector={"node-type": "driver"},
executor_node_selector={"node-type": "executor"},
driver_labels={"label1": "value1"},
executor_labels={"label2": "value2"},
driver_annotations={"annotation1": "value1"},
executor_annotations={"annotation2": "value2"},
)

@mock.patch("spark_on_k8s.client.SparkOnK8S.submit_app")
Expand Down Expand Up @@ -137,4 +145,8 @@ def test_rendering_templates(self, mock_submit_app):
executor_volume_mounts=None,
driver_node_selector=None,
executor_node_selector=None,
driver_labels=None,
executor_labels=None,
driver_annotations=None,
executor_annotations=None,
)
64 changes: 64 additions & 0 deletions tests/test_spark_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,3 +687,67 @@ def test_submit_app_with_node_selectors(
"spark.kubernetes.executor.node.selector.component": "spark",
"spark.kubernetes.executor.node.selector.role": "executor",
}

@mock.patch("spark_on_k8s.k8s.sync_client.KubernetesClientManager.create_client")
@mock.patch("kubernetes.client.api.core_v1_api.CoreV1Api.create_namespaced_pod")
@mock.patch("kubernetes.client.api.core_v1_api.CoreV1Api.create_namespaced_service")
@freeze_time(FAKE_TIME)
def test_submit_app_with_labels_and_annotations(
self, mock_create_namespaced_service, mock_create_namespaced_pod, mock_create_client
):
"""Test the method submit_app"""

spark_client = SparkOnK8S()
spark_client.submit_app(
image="pyspark-job",
app_path="local:///opt/spark/work-dir/job.py",
namespace="spark",
service_account="spark",
app_name="pyspark-job-example",
app_arguments=["100000"],
app_waiter="no_wait",
image_pull_policy="Never",
ui_reverse_proxy=True,
driver_resources=PodResources(cpu=1, memory=2048, memory_overhead=1024),
executor_instances=ExecutorInstances(min=2, max=5, initial=5),
driver_labels={"label1": "value1", "label2": "value2"},
executor_labels={"label3": "value3", "label4": "value4"},
driver_annotations={"annotation1": "value1", "annotation2": "value2"},
executor_annotations={"annotation3": "value3", "annotation4": "value4"},
)

created_pod = mock_create_namespaced_pod.call_args[1]["body"]
assert created_pod.metadata.labels == {
# default labels
"spark-app-name": "pyspark-job-example",
"spark-app-id": "pyspark-job-example-20240114121231",
"spark-role": "driver",
# ui reverse proxy label
"spark-ui-proxy": "true",
# custom labels
"label1": "value1",
"label2": "value2",
}
assert created_pod.metadata.annotations == {
"annotation1": "value1",
"annotation2": "value2",
}
arguments = created_pod.spec.containers[0].args
labels_config = {
conf.split("=")[0]: conf.split("=")[1]
for conf in arguments
if conf.startswith("spark.kubernetes.executor.label.")
}
annotations_config = {
conf.split("=")[0]: conf.split("=")[1]
for conf in arguments
if conf.startswith("spark.kubernetes.executor.annotation.")
}
assert labels_config == {
"spark.kubernetes.executor.label.label3": "value3",
"spark.kubernetes.executor.label.label4": "value4",
}
assert annotations_config == {
"spark.kubernetes.executor.annotation.annotation3": "value3",
"spark.kubernetes.executor.annotation.annotation4": "value4",
}

0 comments on commit e797a58

Please sign in to comment.