From 14cdea4b665b98652c5ed30ea2d8341cb7c33925 Mon Sep 17 00:00:00 2001 From: Douwe Maan Date: Tue, 20 Feb 2024 21:29:29 -0600 Subject: [PATCH] Read apps.yml directly from Airflow DAG generator --- apps.yml | 2 +- meltano.yml | 1 - plugins/airflow/dags/meltano_dag_generator.py | 186 ------------------ plugins/airflow/dags/pixbyt.py | 65 ++++++ 4 files changed, 66 insertions(+), 188 deletions(-) delete mode 100644 plugins/airflow/dags/meltano_dag_generator.py create mode 100644 plugins/airflow/dags/pixbyt.py diff --git a/apps.yml b/apps.yml index 592f350..dc3b3b7 100644 --- a/apps.yml +++ b/apps.yml @@ -1,6 +1,6 @@ schedules: - name: hello-world - interval: '0 * * * *' # On the hour + interval: '0 * * * *' # Every hour job: hello-world # - name: diff --git a/meltano.yml b/meltano.yml index ac4b89b..67e39c0 100644 --- a/meltano.yml +++ b/meltano.yml @@ -8,4 +8,3 @@ environments: include_paths: - "./plugins/*.meltano.yml" - "./apps/**/pixbyt.yml" -- "./apps.yml" diff --git a/plugins/airflow/dags/meltano_dag_generator.py b/plugins/airflow/dags/meltano_dag_generator.py deleted file mode 100644 index e139d6a..0000000 --- a/plugins/airflow/dags/meltano_dag_generator.py +++ /dev/null @@ -1,186 +0,0 @@ -# If you want to define a custom DAG, create -# a new file under plugins/airflow/dags/ and Airflow -# will pick it up automatically. - -import json -import logging -import os -import subprocess -from collections.abc import Iterable - -from airflow import DAG - -try: - from airflow.operators.bash_operator import BashOperator -except ImportError: - from airflow.operators.bash import BashOperator - -from datetime import datetime, timedelta -from pathlib import Path - -logger = logging.getLogger(__name__) - -DEFAULT_ARGS = { - "owner": "airflow", - "depends_on_past": False, - "email_on_failure": False, - "email_on_retry": False, - "catchup": False, - "retries": 1, - "retry_delay": timedelta(minutes=5), - "concurrency": 1, -} - -DEFAULT_TAGS = ["meltano"] -PROJECT_ROOT = os.getenv("MELTANO_PROJECT_ROOT", os.getcwd()) -MELTANO_BIN = ".meltano/run/bin" - -if not Path(PROJECT_ROOT).joinpath(MELTANO_BIN).exists(): - logger.warning( - f"A symlink to the 'meltano' executable could not be found at '{MELTANO_BIN}'. Falling back on expecting it " - f"to be in the PATH instead. " - ) - MELTANO_BIN = "meltano" - - -def _meltano_elt_generator(schedules): - """Generate singular dag's for each legacy Meltano elt task. - - Args: - schedules (list): List of Meltano schedules. - """ - for schedule in schedules: - logger.info(f"Considering schedule '{schedule['name']}': {schedule}") - if not schedule["cron_interval"]: - logger.info( - f"No DAG created for schedule '{schedule['name']}' because its interval is set to `@once`.", - ) - continue - - args = DEFAULT_ARGS.copy() - if schedule["start_date"]: - args["start_date"] = schedule["start_date"] - - dag_id = f"meltano_{schedule['name']}" - - tags = DEFAULT_TAGS.copy() - if schedule["extractor"]: - tags.append(schedule["extractor"]) - if schedule["loader"]: - tags.append(schedule["loader"]) - if schedule["transform"] == "run": - tags.append("transform") - elif schedule["transform"] == "only": - tags.append("transform-only") - - # from https://airflow.apache.org/docs/stable/scheduler.html#backfill-and-catchup - # - # It is crucial to set `catchup` to False so that Airflow only create a single job - # at the tail end of date window we want to extract data. - # - # Because our extractors do not support date-window extraction, it serves no - # purpose to enqueue date-chunked jobs for complete extraction window. - dag = DAG( - dag_id, - tags=tags, - catchup=False, - default_args=args, - schedule_interval=schedule["interval"], - max_active_runs=1, - ) - - elt = BashOperator( - task_id="extract_load", - bash_command=f"cd {PROJECT_ROOT}; {MELTANO_BIN} schedule run {schedule['name']}", - dag=dag, - ) - - # register the dag - globals()[dag_id] = dag - logger.info(f"DAG created for schedule '{schedule['name']}'") - - -def _meltano_job_generator(schedules): - """Generate dag's for each task within a Meltano scheduled job. - - Args: - schedules (list): List of Meltano scheduled jobs. - """ - for schedule in schedules: - if not schedule.get("job"): - logger.info( - f"No DAG's created for schedule '{schedule['name']}'. It was passed to job generator but has no job." - ) - continue - if not schedule["cron_interval"]: - logger.info( - f"No DAG created for schedule '{schedule['name']}' because its interval is set to `@once`." - ) - continue - - base_id = schedule['name'] - common_tags = DEFAULT_TAGS.copy() - common_tags.append(f"schedule:{schedule['name']}") - common_tags.append(f"job:{schedule['job']['name']}") - interval = schedule["cron_interval"] - args = DEFAULT_ARGS.copy() - args["start_date"] = schedule.get("start_date", datetime(1970, 1, 1, 0, 0, 0)) - - with DAG( - base_id, - tags=common_tags, - catchup=False, - default_args=args, - schedule_interval=interval, - max_active_runs=1, - ) as dag: - previous_task = None - for idx, task in enumerate(schedule["job"]["tasks"]): - logger.info( - f"Considering task '{task}' of schedule '{schedule['name']}': {schedule}" - ) - - task_id = f"{base_id}_task{idx}" - - if isinstance(task, Iterable) and not isinstance(task, str): - run_args = " ".join(task) - else: - run_args = task - - task = BashOperator( - task_id=task_id, - bash_command=f"cd {PROJECT_ROOT}; {MELTANO_BIN} run {run_args}", - dag=dag, - ) - if previous_task: - task.set_upstream(previous_task) - previous_task = task - logger.info( - f"Spun off task '{task}' of schedule '{schedule['name']}': {schedule}" - ) - - globals()[base_id] = dag - logger.info(f"DAG created for schedule '{schedule['name']}', task='{run_args}'") - - -def create_dags(): - """Create DAGs for Meltano schedules.""" - list_result = subprocess.run( - [MELTANO_BIN, "schedule", "list", "--format=json"], - cwd=PROJECT_ROOT, - stdout=subprocess.PIPE, - universal_newlines=True, - check=True, - ) - schedule_export = json.loads(list_result.stdout) - - if schedule_export.get("schedules"): - logger.info(f"Received meltano v2 style schedule export: {schedule_export}") - _meltano_elt_generator(schedule_export["schedules"].get("elt")) - _meltano_job_generator(schedule_export["schedules"].get("job")) - else: - logger.info(f"Received meltano v1 style schedule export: {schedule_export}") - _meltano_elt_generator(schedule_export) - - -create_dags() diff --git a/plugins/airflow/dags/pixbyt.py b/plugins/airflow/dags/pixbyt.py new file mode 100644 index 0000000..7ca6dad --- /dev/null +++ b/plugins/airflow/dags/pixbyt.py @@ -0,0 +1,65 @@ +import os +import yaml +import logging +from datetime import datetime, timedelta +from pathlib import Path + +from airflow import DAG +from airflow.operators.bash import BashOperator + +logger = logging.getLogger(__name__) + +DEFAULT_DAG_OPTS = { + "catchup": False, + "max_active_runs": 1, + "default_args": { + "owner": "airflow", + "depends_on_past": False, + "email_on_failure": False, + "email_on_retry": False, + "catchup": False, + "retries": 1, + "retry_delay": timedelta(minutes=5), + "concurrency": 1, + "start_date": datetime(1970, 1, 1, 0, 0, 0), + } +} +DEFAULT_INTERVAL = "*/15 * * * *" + +PROJECT_ROOT = os.getenv("MELTANO_PROJECT_ROOT", os.getcwd()) +MELTANO_EXECUTABLE = ".meltano/run/bin" +APPS_FILENAME = "apps.yml" + +apps_path = Path(PROJECT_ROOT).joinpath(APPS_FILENAME) +apps_config = yaml.safe_load(apps_path.read_text()) + +schedules = apps_config.get("schedules", []) + +for schedule in schedules: + name = schedule.get("name") + + if not name: + logger.warning("Skipping app without a name") + continue + + interval = schedule.get("interval", DEFAULT_INTERVAL) + job = schedule.get("job", name) + + env = schedule.get("env", {}) + env = {k: str(v) for k, v in env.items()} + + dag_id = name.replace("/", "--") + with DAG(dag_id, schedule=interval, **DEFAULT_DAG_OPTS) as dag: + cmd = f"{MELTANO_EXECUTABLE} run {job}" + + task = BashOperator( + dag=dag, + task_id="run", + cwd=str(PROJECT_ROOT), + bash_command=cmd, + env=env, + append_env=True, + ) + globals()[dag_id] = dag + + logger.info(f"Created DAG '{dag_id}': interval='{interval}', cmd='{cmd}', env={env}")