Skip to content

Commit c07eccb

Browse files
authored
Merge pull request #24 from DouweM/airflow-dag-gen
Read apps.yml directly from Airflow DAG generator
2 parents bca2f43 + 14cdea4 commit c07eccb

File tree

4 files changed

+66
-188
lines changed

4 files changed

+66
-188
lines changed

apps.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
schedules:
22
- name: hello-world
3-
interval: '0 * * * *' # On the hour
3+
interval: '0 * * * *' # Every hour
44
job: hello-world
55

66
# - name: <app>

meltano.yml

-1
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,3 @@ environments:
88
include_paths:
99
- "./plugins/*.meltano.yml"
1010
- "./apps/**/pixbyt.yml"
11-
- "./apps.yml"

plugins/airflow/dags/meltano_dag_generator.py

-186
This file was deleted.

plugins/airflow/dags/pixbyt.py

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import os
2+
import yaml
3+
import logging
4+
from datetime import datetime, timedelta
5+
from pathlib import Path
6+
7+
from airflow import DAG
8+
from airflow.operators.bash import BashOperator
9+
10+
logger = logging.getLogger(__name__)
11+
12+
DEFAULT_DAG_OPTS = {
13+
"catchup": False,
14+
"max_active_runs": 1,
15+
"default_args": {
16+
"owner": "airflow",
17+
"depends_on_past": False,
18+
"email_on_failure": False,
19+
"email_on_retry": False,
20+
"catchup": False,
21+
"retries": 1,
22+
"retry_delay": timedelta(minutes=5),
23+
"concurrency": 1,
24+
"start_date": datetime(1970, 1, 1, 0, 0, 0),
25+
}
26+
}
27+
DEFAULT_INTERVAL = "*/15 * * * *"
28+
29+
PROJECT_ROOT = os.getenv("MELTANO_PROJECT_ROOT", os.getcwd())
30+
MELTANO_EXECUTABLE = ".meltano/run/bin"
31+
APPS_FILENAME = "apps.yml"
32+
33+
apps_path = Path(PROJECT_ROOT).joinpath(APPS_FILENAME)
34+
apps_config = yaml.safe_load(apps_path.read_text())
35+
36+
schedules = apps_config.get("schedules", [])
37+
38+
for schedule in schedules:
39+
name = schedule.get("name")
40+
41+
if not name:
42+
logger.warning("Skipping app without a name")
43+
continue
44+
45+
interval = schedule.get("interval", DEFAULT_INTERVAL)
46+
job = schedule.get("job", name)
47+
48+
env = schedule.get("env", {})
49+
env = {k: str(v) for k, v in env.items()}
50+
51+
dag_id = name.replace("/", "--")
52+
with DAG(dag_id, schedule=interval, **DEFAULT_DAG_OPTS) as dag:
53+
cmd = f"{MELTANO_EXECUTABLE} run {job}"
54+
55+
task = BashOperator(
56+
dag=dag,
57+
task_id="run",
58+
cwd=str(PROJECT_ROOT),
59+
bash_command=cmd,
60+
env=env,
61+
append_env=True,
62+
)
63+
globals()[dag_id] = dag
64+
65+
logger.info(f"Created DAG '{dag_id}': interval='{interval}', cmd='{cmd}', env={env}")

0 commit comments

Comments
 (0)