diff --git a/elyra/pipeline/airflow/processor_airflow.py b/elyra/pipeline/airflow/processor_airflow.py index c7449c6f8..e53629bf4 100644 --- a/elyra/pipeline/airflow/processor_airflow.py +++ b/elyra/pipeline/airflow/processor_airflow.py @@ -345,6 +345,7 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance "cpu_limit": operation.cpu_limit, "memory_limit": operation.memory_limit, "gpu_limit": operation.gpu, + "gpu_vendor": operation.gpu_vendor, "operator_source": operation.filename, } @@ -598,13 +599,23 @@ def render_volumes(self, elyra_properties: Dict[str, ElyraProperty]) -> str: str_to_render = "" for v in elyra_properties.get(pipeline_constants.MOUNTED_VOLUMES, []): str_to_render += f""" - Volume(name="{v.pvc_name}", configs={{"persistentVolumeClaim": {{"claimName": "{v.pvc_name}"}}}}),""" + k8s.V1Volume( + name="{v.pvc_name}", + persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource( + claim_name="{v.pvc_name}", + ), + ),""" # set custom shared memory size shm = elyra_properties.get(pipeline_constants.KUBERNETES_SHARED_MEM_SIZE) if shm is not None and shm.size: - config = f"""configs={{"emptyDir": {{"medium": "Memory", "sizeLimit": "{shm.size}{shm.units}"}}}}""" str_to_render += f""" - Volume(name="shm", {config}),""" + k8s.V1Volume( + name="shm", + empty_dir=k8s.V1EmptyDirVolumeSource( + medium="Memory", + size_limit="{shm.size}{shm.units}", + ), + ),""" return dedent(str_to_render) def render_mounts(self, elyra_properties: Dict[str, ElyraProperty]) -> str: @@ -615,8 +626,12 @@ def render_mounts(self, elyra_properties: Dict[str, ElyraProperty]) -> str: str_to_render = "" for v in elyra_properties.get(pipeline_constants.MOUNTED_VOLUMES, []): str_to_render += f""" - VolumeMount(name="{v.pvc_name}", mount_path="{v.path}", - sub_path="{v.sub_path}", read_only={v.read_only}),""" + k8s.V1VolumeMount( + name="{v.pvc_name}", + mount_path="{v.path}", + sub_path="{v.sub_path}", + read_only={v.read_only}, + ),""" return dedent(str_to_render) def render_secrets(self, elyra_properties: Dict[str, ElyraProperty], cos_secret: Optional[str]) -> str: diff --git a/elyra/templates/airflow/airflow_template.jinja2 b/elyra/templates/airflow/airflow_template.jinja2 index fc68a55f6..b9245695f 100644 --- a/elyra/templates/airflow/airflow_template.jinja2 +++ b/elyra/templates/airflow/airflow_template.jinja2 @@ -1,15 +1,15 @@ from airflow import DAG -from airflow.utils.dates import days_ago +import pendulum args = { 'project_id' : '{{ pipeline_name }}', } dag = DAG( - '{{ pipeline_name }}', + dag_id='{{ pipeline_name }}', default_args=args, - schedule_interval='@once', - start_date=days_ago(1), + schedule='@once', + start_date=pendulum.today('UTC').add(days=-1), description=""" {{ pipeline_description|replace("\"\"\"", "\\\"\\\"\\\"") }} """, @@ -22,10 +22,9 @@ dag = DAG( {{import_statement}} {% endfor %} {% else %} -from airflow.kubernetes.secret import Secret -from airflow.contrib.kubernetes.volume import Volume -from airflow.contrib.kubernetes.volume_mount import VolumeMount -from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator +from airflow.providers.cncf.kubernetes.secret import Secret +from kubernetes.client import models as k8s +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator {% endif %} {% if operation.operator_source %}# Operator source: {{ operation.operator_source }}{% endif %} @@ -48,23 +47,27 @@ op_{{ operation.id|regex_replace }} = KubernetesPodOperator(name='{{ operation.n task_id='{{ operation.notebook|regex_replace }}', env_vars={{ operation.pipeline_envs }}, {% if operation.cpu_request or operation.mem_request or operation.cpu_limit or operation.memory_limit or operation.gpu_limit %} - resources = { - {% if operation.cpu_request %} - 'request_cpu': '{{ operation.cpu_request }}', - {% endif %} - {% if operation.mem_request %} - 'request_memory': '{{ operation.mem_request }}G', - {% endif %} - {% if operation.cpu_limit %} - 'limit_cpu': '{{ operation.cpu_limit }}', - {% endif %} - {% if operation.memory_limit %} - 'limit_memory': '{{ operation.memory_limit }}G', - {% endif %} - {% if operation.gpu_limit %} - 'limit_gpu': '{{ operation.gpu_limit }}', - {% endif %} - }, + container_resources=k8s.V1ResourceRequirements( + requests={ + {% if operation.cpu_request %} + 'cpu': '{{ operation.cpu_request }}', + {% endif %} + {% if operation.mem_request %} + 'memory': '{{ operation.mem_request }}G', + {% endif %} + }, + limits={ + {% if operation.cpu_limit %} + 'cpu': '{{ operation.cpu_limit }}', + {% endif %} + {% if operation.memory_limit %} + 'memory': '{{ operation.memory_limit }}G', + {% endif %} + {% if operation.gpu_limit %} + '{{ operation.gpu_vendor }}': '{{ operation.gpu_limit }}', + {% endif %} + } + ), {% endif %} volumes=[{{ processor.render_volumes(operation.elyra_props) }}], volume_mounts=[{{ processor.render_mounts(operation.elyra_props) }}], @@ -73,7 +76,7 @@ op_{{ operation.id|regex_replace }} = KubernetesPodOperator(name='{{ operation.n labels={{ processor.render_labels(operation.elyra_props) }}, tolerations=[{{ processor.render_tolerations(operation.elyra_props) }}], in_cluster={{ in_cluster }}, - config_file="{{ kube_config_path }}", + config_file={% if kube_config_path is string %}"{{ kube_config_path }}"{% else %}{{ kube_config_path }}{% endif %}, {% endif %} dag=dag) {% if operation.image_pull_policy %} diff --git a/elyra/tests/pipeline/airflow/test_processor_airflow.py b/elyra/tests/pipeline/airflow/test_processor_airflow.py index 7217aae7a..30e387f36 100644 --- a/elyra/tests/pipeline/airflow/test_processor_airflow.py +++ b/elyra/tests/pipeline/airflow/test_processor_airflow.py @@ -195,7 +195,7 @@ def test_create_file(monkeypatch, processor, parsed_pipeline, parsed_ordered_dic with open(response) as f: file_as_lines = f.read().splitlines() - assert "from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator" in file_as_lines + assert "from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator" in file_as_lines # Check DAG project name for i in range(len(file_as_lines)):