Skip to content

Commit

Permalink
airflow 2.x generic pipeline mods. See also #3166. Changed processor …
Browse files Browse the repository at this point in the history
…airflow and jinja2 template airflow to comply with Airflow 2.x. Added new location of KubernetesPodOperator library in Airflow 2.x to test Pipeline for processor airflow. Added cpu and memory limits fields in airflow 2.x fashion as well.

Signed-off-by: Sven Thoms <[email protected]>
  • Loading branch information
shalberd committed Sep 3, 2024
1 parent e90d364 commit d4176b9
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 32 deletions.
25 changes: 20 additions & 5 deletions elyra/pipeline/airflow/processor_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance
"cpu_limit": operation.cpu_limit,
"memory_limit": operation.memory_limit,
"gpu_limit": operation.gpu,
"gpu_vendor": operation.gpu_vendor,
"operator_source": operation.filename,
}

Expand Down Expand Up @@ -598,13 +599,23 @@ def render_volumes(self, elyra_properties: Dict[str, ElyraProperty]) -> str:
str_to_render = ""
for v in elyra_properties.get(pipeline_constants.MOUNTED_VOLUMES, []):
str_to_render += f"""
Volume(name="{v.pvc_name}", configs={{"persistentVolumeClaim": {{"claimName": "{v.pvc_name}"}}}}),"""
k8s.V1Volume(
name="{v.pvc_name}",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
claim_name="{v.pvc_name}",
),
),"""
# set custom shared memory size
shm = elyra_properties.get(pipeline_constants.KUBERNETES_SHARED_MEM_SIZE)
if shm is not None and shm.size:
config = f"""configs={{"emptyDir": {{"medium": "Memory", "sizeLimit": "{shm.size}{shm.units}"}}}}"""
str_to_render += f"""
Volume(name="shm", {config}),"""
k8s.V1Volume(
name="shm",
empty_dir=k8s.V1EmptyDirVolumeSource(
medium="Memory",
size_limit="{shm.size}{shm.units}",
),
),"""
return dedent(str_to_render)

def render_mounts(self, elyra_properties: Dict[str, ElyraProperty]) -> str:
Expand All @@ -615,8 +626,12 @@ def render_mounts(self, elyra_properties: Dict[str, ElyraProperty]) -> str:
str_to_render = ""
for v in elyra_properties.get(pipeline_constants.MOUNTED_VOLUMES, []):
str_to_render += f"""
VolumeMount(name="{v.pvc_name}", mount_path="{v.path}",
sub_path="{v.sub_path}", read_only={v.read_only}),"""
k8s.V1VolumeMount(
name="{v.pvc_name}",
mount_path="{v.path}",
sub_path="{v.sub_path}",
read_only={v.read_only},
),"""
return dedent(str_to_render)

def render_secrets(self, elyra_properties: Dict[str, ElyraProperty], cos_secret: Optional[str]) -> str:
Expand Down
55 changes: 29 additions & 26 deletions elyra/templates/airflow/airflow_template.jinja2
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from airflow import DAG
from airflow.utils.dates import days_ago
import pendulum

args = {
'project_id' : '{{ pipeline_name }}',
}

dag = DAG(
'{{ pipeline_name }}',
dag_id='{{ pipeline_name }}',
default_args=args,
schedule_interval='@once',
start_date=days_ago(1),
schedule='@once',
start_date=pendulum.today('UTC').add(days=-1),
description="""
{{ pipeline_description|replace("\"\"\"", "\\\"\\\"\\\"") }}
""",
Expand All @@ -22,10 +22,9 @@ dag = DAG(
{{import_statement}}
{% endfor %}
{% else %}
from airflow.kubernetes.secret import Secret
from airflow.contrib.kubernetes.volume import Volume
from airflow.contrib.kubernetes.volume_mount import VolumeMount
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.secret import Secret
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
{% endif %}

{% if operation.operator_source %}# Operator source: {{ operation.operator_source }}{% endif %}
Expand All @@ -48,23 +47,27 @@ op_{{ operation.id|regex_replace }} = KubernetesPodOperator(name='{{ operation.n
task_id='{{ operation.notebook|regex_replace }}',
env_vars={{ operation.pipeline_envs }},
{% if operation.cpu_request or operation.mem_request or operation.cpu_limit or operation.memory_limit or operation.gpu_limit %}
resources = {
{% if operation.cpu_request %}
'request_cpu': '{{ operation.cpu_request }}',
{% endif %}
{% if operation.mem_request %}
'request_memory': '{{ operation.mem_request }}G',
{% endif %}
{% if operation.cpu_limit %}
'limit_cpu': '{{ operation.cpu_limit }}',
{% endif %}
{% if operation.memory_limit %}
'limit_memory': '{{ operation.memory_limit }}G',
{% endif %}
{% if operation.gpu_limit %}
'limit_gpu': '{{ operation.gpu_limit }}',
{% endif %}
},
container_resources=k8s.V1ResourceRequirements(
requests={
{% if operation.cpu_request %}
'cpu': '{{ operation.cpu_request }}',
{% endif %}
{% if operation.mem_request %}
'memory': '{{ operation.mem_request }}G',
{% endif %}
},
limits={
{% if operation.cpu_limit %}
'cpu': '{{ operation.cpu_limit }}',
{% endif %}
{% if operation.memory_limit %}
'memory': '{{ operation.memory_limit }}G',
{% endif %}
{% if operation.gpu_limit %}
'{{ operation.gpu_vendor }}': '{{ operation.gpu_limit }}',
{% endif %}
}
),
{% endif %}
volumes=[{{ processor.render_volumes(operation.elyra_props) }}],
volume_mounts=[{{ processor.render_mounts(operation.elyra_props) }}],
Expand All @@ -73,7 +76,7 @@ op_{{ operation.id|regex_replace }} = KubernetesPodOperator(name='{{ operation.n
labels={{ processor.render_labels(operation.elyra_props) }},
tolerations=[{{ processor.render_tolerations(operation.elyra_props) }}],
in_cluster={{ in_cluster }},
config_file="{{ kube_config_path }}",
config_file={% if kube_config_path is string %}"{{ kube_config_path }}"{% else %}{{ kube_config_path }}{% endif %},
{% endif %}
dag=dag)
{% if operation.image_pull_policy %}
Expand Down
2 changes: 1 addition & 1 deletion elyra/tests/pipeline/airflow/test_processor_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def test_create_file(monkeypatch, processor, parsed_pipeline, parsed_ordered_dic
with open(response) as f:
file_as_lines = f.read().splitlines()

assert "from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator" in file_as_lines
assert "from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator" in file_as_lines

# Check DAG project name
for i in range(len(file_as_lines)):
Expand Down

0 comments on commit d4176b9

Please sign in to comment.