Using Airflow to implement our ETL pipelines
- Dag 的命名規則請看這篇 阿里巴巴大數據實戰
- Please refer to this article for naming guidline
- ods/opening_crawler: Crawlers written by @Rain. Those openings can be used for recuitment board, which was implemented by @tai271828 and @stacy.
- ods/survey_cake: A manually triggered uploader which would upload questionnaire to bigquery. The uploader should be invoked after we recieved the surveycake questionnaire.
docker pull puckel/docker-airflow:1.10.9
- Python dependencies:
virtualenv venv
. venv/bin/activate
pip install poetry
poetry install
- Npm dependencies, for linter, formatter and commit linter (optional):
brew install npm
npm ci
git add <files>
npm run check
: Apply all the linter and formatternpm run commit
. venv/bin/activate
. ./.env.sh
cd contrib
- Check its command in contrib/README.md
python xxx.py
Would need to setup Snowflake Connection manually, find @davidtnfsh if you don't have those secrets
⚠ WARNING: About .env
Please don't use the .env for local development, or it might screw up the production tables.
- Build docker image:
- Build a production image (for production):
docker build -t davidtnfsh/pycon_etl:prod --cache-from davidtnfsh/pycon_etl:prod -f Dockerfile .
If you want to build dev/test image, you also need to build this docker image first because dev/test image is on top of this production image. See below. - Build dev/test image (for dev/test):
docker build -t davidtnfsh/pycon_etl:test --cache-from davidtnfsh/pycon_etl:prod -f Dockerfile.test .
- Build a production image (for production):
- Fill in some secrets:
cp .env.template .env.staging
for dev/test.cp .env.template .env.production
instead if you are going to start a production instance.- Follow the instruction in
.env.<staging|production>
and fill in your secrets. If you are just running the staging instance for development as a sandbox, and not going to access any specific thrid-party service, leave the.env.staging
as-is should be fine.
- Start the Airflow server:
- production:
docker run --rm -p 80:8080 --name airflow -v $(pwd)/dags:/usr/local/airflow/dags -v $(pwd)/service-account.json:/usr/local/airflow/service-account.json --env-file=./.env.production davidtnfsh/pycon_etl:prod webserver
- dev/test:
docker run --rm -p 80:8080 --name airflow -v $(pwd)/dags:/usr/local/airflow/dags -v $(pwd)/service-account.json:/usr/local/airflow/service-account.json --env-file=./.env.staging davidtnfsh/pycon_etl:test webserver
- Note the difference are just the env file name and the image cache.
- production:
- Enter the
localhost
or127.0.0.1
in the address bar in your browser. Open the Airflow Page.- If Port 80 is already in use. You can select a different host port in the
-p
argument todocker run
. And enter thelocalhost:<host port>
in the address bar.
- If Port 80 is already in use. You can select a different host port in the
- Setup the Authentication of GCP: https://googleapis.dev/python/google-api-core/latest/auth.html
- After invoking
gcloud auth application-default login
, you'll get a credentials.json resides in$HOME/.config/gcloud/application_default_credentials.json
. Invokeexport GOOGLE_APPLICATION_CREDENTIALS="/path/to/keyfile.json"
if you have it. - service-account.json: Please contact @david30907d using email, telegram or discord. No worry about this json if you are just running the sandbox staging instance for development.
- After invoking
- Give Toy-Examples a try
- Manually deploy to Google compute instance
- Fill out
airflow.cfg
with Google OAuth ID and credential (Ref: setting-up-google-authentication)
Please check .github/workflows for details
BigQuery Example:
from google.cloud import bigquery
client = bigquery.Client(project='pycontw-225217')
# Perform a query.
QUERY = '''
SELECT scenario.day2checkin.attr.diet FROM `pycontw-225217.ods.ods_opass_attendee_timestamp`
'''
query_job = client.query(QUERY) # API request
rows = query_job.result() # Waits for query to finish
for row in rows:
print(row.diet)