Skip to content

Commit

Permalink
Merge branch 'main' into reports-add-rt-check-order
Browse files Browse the repository at this point in the history
  • Loading branch information
owades authored Jul 31, 2023
2 parents 3173b6f + 1c06920 commit 4fd3899
Show file tree
Hide file tree
Showing 58 changed files with 3,626 additions and 1,277 deletions.
40 changes: 40 additions & 0 deletions .github/workflows/build-calitp-data-analysis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: Test, visualize, and build calitp-data-analysis

on:
push:
branches:
- 'main'
paths:
- '.github/workflows/build-calitp-data-analysis.yml'
- 'packages/calitp-data-analysis/calitp_data_analysis/**'
pull_request:
paths:
- '.github/workflows/build-calitp-data-analysis.yml'
- 'packages/calitp-data-analysis/calitp_data_analysis/**'

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
check_and_build:
name: check python
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.9'
- uses: google-github-actions/setup-gcloud@v0
with:
service_account_key: ${{ secrets.GCP_SA_KEY }}
export_default_credentials: true
- name: Run checks
working-directory: packages/calitp-data-analysis
run: |
curl -sSL https://install.python-poetry.org | python -
poetry install
poetry run mypy .
poetry run pytest
poetry build
# TODO: add publishing
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@

import pendulum
import sentry_sdk
from calitp_data.storage import get_fs
from calitp_data_infra.storage import (
AirtableGTFSDataExtract,
AirtableGTFSDataRecord,
GCSFileInfo,
GTFSDownloadConfig,
GTFSDownloadConfigExtract,
get_fs,
)
from pydantic import ValidationError

Expand Down
4 changes: 1 addition & 3 deletions airflow/dags/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import macros
import requests
from calitp_data.templates import user_defined_filters, user_defined_macros
from gusty import create_dag

import airflow # noqa
Expand Down Expand Up @@ -56,8 +55,7 @@ def log_failure_to_slack(context):
task_group_defaults={"tooltip": "this is a default tooltip"},
wait_for_defaults={"retries": 24, "check_existence": True, "timeout": 10 * 60},
latest_only=False,
user_defined_macros={**user_defined_macros, **macros.data_infra_macros},
user_defined_filters=user_defined_filters,
user_defined_macros=macros.data_infra_macros,
default_args={
"on_failure_callback": log_failure_to_slack,
# "on_retry_callback": log_failure_to_slack,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import humanize
import pendulum
import sentry_sdk
from calitp_data.storage import get_fs
from calitp_data_infra.auth import get_secrets_by_label
from calitp_data_infra.storage import (
JSONL_EXTENSION,
Expand All @@ -22,6 +21,7 @@
PartitionedGCSArtifact,
ProcessingOutcome,
download_feed,
get_fs,
get_latest,
)
from pydantic import validator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
# trigger_rule: all_done
# ---
import datetime
import os

import pandas as pd
from calitp_data.config import is_development

from airflow.models.taskinstance import TaskInstance
from airflow.utils.email import send_email
Expand All @@ -32,7 +32,7 @@ def email_failures(task_instance: TaskInstance, execution_date, **kwargs):
{html_report}
"""

if is_development():
if os.environ["AIRFLOW_ENV"] == "development":
print(
f"Skipping since in development mode! Would have emailed {failures_df.shape[0]} failures."
)
Expand Down
29 changes: 6 additions & 23 deletions airflow/dags/macros.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,12 @@
"""Macros for Operators"""
import os

from calitp_data.config import is_development

# To add a macro, add its definition in the appropriate section
# And then add it to the dictionary at the bottom of this file

# Is Development ======================================================


def is_development_macro():
"""Make calitp-py's is_development function available via macro"""

return is_development()


# ACTUALLY DEFINE MACROS =============================================================

# template must be added here to be accessed in dags.py
# key is alias that will be used to reference the template in DAG tasks
# value is name of function template as defined above


data_infra_macros = {
"is_development": is_development_macro,
"image_tag": lambda: "development" if is_development() else "latest",
"image_tag": lambda: "development"
if os.environ["AIRFLOW_ENV"] == "development"
else "latest",
"get_project_id": lambda: "cal-itp-data-infra-staging"
if os.environ["AIRFLOW_ENV"] == "development"
else "cal-itp-data-infra",
"env_var": os.getenv,
}
2 changes: 1 addition & 1 deletion airflow/dags/parse_elavon/elavon_to_gcs_jsonl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

import pandas as pd
import pendulum
from calitp_data.storage import get_fs
from calitp_data_infra.storage import ( # type: ignore
PartitionedGCSArtifact,
get_fs,
make_name_bq_safe,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ arguments:
- '/app/scripts/publish.py'
- 'publish-exposure'
- 'california_open_data'
- '{% if is_development() %}--no-publish{% else %}--publish{% endif %}'
- '{% if env_var("AIRFLOW_ENV") == "development" %}--no-publish{% else %}--publish{% endif %}'

is_delete_operator_pod: true
get_logs: true
Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/sync_elavon/elavon_to_gcs_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

import paramiko
import pendulum
from calitp_data.storage import get_fs
from calitp_data_infra.auth import get_secret_by_name
from calitp_data_infra.storage import get_fs

CALITP__ELAVON_SFTP_HOSTNAME = os.environ["CALITP__ELAVON_SFTP_HOSTNAME"]
CALITP__ELAVON_SFTP_PORT = os.environ["CALITP__ELAVON_SFTP_PORT"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
import pendulum
import sentry_sdk
import typer
from calitp_data.storage import get_fs
from calitp_data_infra.storage import (
GTFSFeedType,
GTFSScheduleFeedExtract,
PartitionedGCSArtifact,
ProcessingOutcome,
get_fs,
)
from tqdm import tqdm
from utils import (
Expand Down
1 change: 0 additions & 1 deletion airflow/plugins/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# flake8: noqa
from operators.airtable_to_gcs import AirtableToGCSOperator
from operators.amplitude_to_flattened_json import AmplitudeToFlattenedJSONOperator
from operators.external_table import ExternalTable
from operators.gtfs_csv_to_jsonl import GtfsGcsToJsonlOperator
from operators.gtfs_csv_to_jsonl_hourly import GtfsGcsToJsonlOperatorHourly
Expand Down
3 changes: 1 addition & 2 deletions airflow/plugins/operators/airtable_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@

import pandas as pd
import pendulum
from calitp_data.storage import get_fs
from calitp_data_infra.auth import get_secret_by_name
from calitp_data_infra.storage import make_name_bq_safe
from calitp_data_infra.storage import get_fs, make_name_bq_safe
from pyairtable import Table
from pydantic import BaseModel

Expand Down
146 changes: 0 additions & 146 deletions airflow/plugins/operators/amplitude_to_flattened_json.py

This file was deleted.

Loading

0 comments on commit 4fd3899

Please sign in to comment.