Skip to content

Commit

Permalink
Improve airflow integration (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek authored Jul 11, 2024
1 parent 6e777b0 commit 0841817
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 12 deletions.
16 changes: 8 additions & 8 deletions opendbt/airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ def load_dbt_tasks(self,
end_node: BaseOperator = None,
tag: str = None,
resource_type="all",
run_dbt_seeds=False,
run_singular_tests=False) -> Tuple[BaseOperator, BaseOperator]:
include_dbt_seeds=False,
include_singular_tests=False) -> Tuple[BaseOperator, BaseOperator]:
"""
This method is used to add dbt tasks to Given DAG.
Expand All @@ -81,7 +81,7 @@ def load_dbt_tasks(self,
dag=dag)
end_node = end_node if end_node else EmptyOperator(task_id='dbt-%s-end' % self.project_dir.name, dag=dag)

if run_dbt_seeds:
if include_dbt_seeds:
# add dbt seeds job after start node abd before all other dbt jobs
first_node = start_node
start_node = OpenDbtExecutorOperator(dag=dag,
Expand All @@ -106,7 +106,7 @@ def load_dbt_tasks(self,
if resource_type == "test" and not str(node.name).startswith("source_"):
if node.resource_type == "test":
dbt_tasks[node.unique_id] = OpenDbtExecutorOperator(dag=dag,
task_id=node.unique_id.rsplit('.', 1)[0],
task_id=node.unique_id,
project_dir=self.project_dir,
profiles_dir=self.profiles_dir,
target=self.target,
Expand All @@ -132,7 +132,7 @@ def load_dbt_tasks(self,
# we are skipping model tests because they are included above with model execution( `build` command)
# source table tests
dbt_tasks[node.unique_id] = OpenDbtExecutorOperator(dag=dag,
task_id=node.unique_id.rsplit('.', 1)[0],
task_id=node.unique_id,
project_dir=self.project_dir,
profiles_dir=self.profiles_dir,
target=self.target,
Expand All @@ -153,7 +153,7 @@ def load_dbt_tasks(self,
task.set_upstream(dbt_tasks[upstream_id])

singular_tests = None
if run_singular_tests:
if include_singular_tests:
singular_tests = OpenDbtExecutorOperator(dag=dag,
task_id=f"{self.project_dir.name}_singular_tests",
project_dir=self.project_dir,
Expand All @@ -167,7 +167,7 @@ def load_dbt_tasks(self,
# set downstream dependencies for the end nodes.
self.log.debug(f"Setting downstream of {task.task_id} -> {end_node.task_id}")

if run_singular_tests and singular_tests:
if include_singular_tests and singular_tests:
task.set_downstream(singular_tests)
else:
task.set_downstream(end_node)
Expand All @@ -177,6 +177,6 @@ def load_dbt_tasks(self,
self.log.debug(f"Setting upstream of {task.task_id} -> {start_node}")
task.set_upstream(start_node)

if run_singular_tests:
if include_singular_tests:
singular_tests.set_downstream(end_node)
return start_node, end_node
32 changes: 32 additions & 0 deletions tests/resources/airflow/dags/dbt_tests_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from pathlib import Path

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago

from opendbt.airflow import OpenDbtAirflowProject

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1
}

with DAG(
dag_id='dbt_tests_workflow',
default_args=default_args,
description='DAG To run dbt tests',
schedule_interval=None,
start_date=days_ago(3),
catchup=False,
max_active_runs=1
) as dag:
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")

DBT_PROJ_DIR = Path("/opt/dbttest")

p = OpenDbtAirflowProject(project_dir=DBT_PROJ_DIR, profiles_dir=DBT_PROJ_DIR, target='dev')
p.load_dbt_tasks(dag=dag, start_node=start, end_node=end, resource_type='test')
8 changes: 4 additions & 4 deletions tests/resources/airflow/dags/dbt_workflow.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from pathlib import Path

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago

from airflow import DAG
from opendbt.airflow import OpenDbtAirflowProject

default_args = {
Expand All @@ -26,7 +26,7 @@
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")

DBTTEST_DIR = Path("/opt/dbttest")
DBT_PROJ_DIR = Path("/opt/dbttest")

p = OpenDbtAirflowProject(project_dir=DBTTEST_DIR, profiles_dir=DBTTEST_DIR, target='dev')
p.load_dbt_tasks(dag=dag, start_node=start, end_node=end)
p = OpenDbtAirflowProject(project_dir=DBT_PROJ_DIR, profiles_dir=DBT_PROJ_DIR, target='dev')
p.load_dbt_tasks(dag=dag, start_node=start, end_node=end, include_singular_tests=True, include_dbt_seeds=True)

0 comments on commit 0841817

Please sign in to comment.