From 13186d8f28acbad82934363d74100f907a9f7399 Mon Sep 17 00:00:00 2001 From: Heeseon Cheon Date: Sat, 11 Nov 2023 02:44:08 +0900 Subject: [PATCH] =?UTF-8?q?airflow=20dag=20=ED=8C=8C=EC=9D=BC=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80=20(#18)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airflow/deploy_daily.py | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 airflow/deploy_daily.py diff --git a/airflow/deploy_daily.py b/airflow/deploy_daily.py new file mode 100644 index 0000000..1b76fc1 --- /dev/null +++ b/airflow/deploy_daily.py @@ -0,0 +1,37 @@ +#!/usr/bin/python + +import os +from datetime import timedelta +from airflow import DAG +from airflow.operators.bash_operator import BashOperator + +SITE_LIST = {"jumpit"} +DEFAULT_ARGS = { + 'owner': 'DE4E', + 'retries': 2, + 'retry_delay': timedelta(minutes=5), +} + +DIR_PATH=os.path.abspath(__file__) +SCRIPT_PATH=f"{DIR_PATH}/../script" +DATA_PATH=f"{DIR_PATH}/../data" + + +with DAG( + dag_id='job_trend_daily', + default_args=DEFAULT_ARGS, + schedule_interval='@daily' +) as dag: + crawling_tasks = [ + BashOperator( + task_id=f'crawling_{site}', + bash_command=f'python3 {SCRIPT_PATH}/crawling.py -s "{site}" -d {DATA_PATH}' + ) for site in SITE_LIST + ] + + upload_task = BashOperator( + task_id = 'upload_to_bigquery', + bash_command = f'python3 {SCRIPT_PATH}/upload_to_bigquery.py' + ) + + crawling_tasks >> upload_task