Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bring in calitp-data-analysis and remove calitp-data #2838

Merged
merged 12 commits into from
Jul 31, 2023
40 changes: 40 additions & 0 deletions .github/workflows/build-calitp-data-analysis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: Test, visualize, and build calitp-data-analysis

on:
push:
branches:
- 'main'
paths:
- '.github/workflows/build-calitp-data-analysis.yml'
- 'packages/calitp-data-analysis/calitp_data_analysis/**'
pull_request:
paths:
- '.github/workflows/build-calitp-data-analysis.yml'
- 'packages/calitp-data-analysis/calitp_data_analysis/**'

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
check_and_build:
name: check python
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.9'
- uses: google-github-actions/setup-gcloud@v0
with:
service_account_key: ${{ secrets.GCP_SA_KEY }}
export_default_credentials: true
- name: Run checks
working-directory: packages/calitp-data-analysis
run: |
curl -sSL https://install.python-poetry.org | python -
poetry install
poetry run mypy .
poetry run pytest
poetry build
# TODO: add publishing
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@

import pendulum
import sentry_sdk
from calitp_data.storage import get_fs
from calitp_data_infra.storage import (
AirtableGTFSDataExtract,
AirtableGTFSDataRecord,
GCSFileInfo,
GTFSDownloadConfig,
GTFSDownloadConfigExtract,
get_fs,
)
from pydantic import ValidationError

Expand Down
4 changes: 1 addition & 3 deletions airflow/dags/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import macros
import requests
from calitp_data.templates import user_defined_filters, user_defined_macros
from gusty import create_dag

import airflow # noqa
Expand Down Expand Up @@ -56,8 +55,7 @@ def log_failure_to_slack(context):
task_group_defaults={"tooltip": "this is a default tooltip"},
wait_for_defaults={"retries": 24, "check_existence": True, "timeout": 10 * 60},
latest_only=False,
user_defined_macros={**user_defined_macros, **macros.data_infra_macros},
user_defined_filters=user_defined_filters,
user_defined_macros=macros.data_infra_macros,
default_args={
"on_failure_callback": log_failure_to_slack,
# "on_retry_callback": log_failure_to_slack,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import humanize
import pendulum
import sentry_sdk
from calitp_data.storage import get_fs
from calitp_data_infra.auth import get_secrets_by_label
from calitp_data_infra.storage import (
JSONL_EXTENSION,
Expand All @@ -22,6 +21,7 @@
PartitionedGCSArtifact,
ProcessingOutcome,
download_feed,
get_fs,
get_latest,
)
from pydantic import validator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
# trigger_rule: all_done
# ---
import datetime
import os

import pandas as pd
from calitp_data.config import is_development

from airflow.models.taskinstance import TaskInstance
from airflow.utils.email import send_email
Expand All @@ -32,7 +32,7 @@ def email_failures(task_instance: TaskInstance, execution_date, **kwargs):
{html_report}
"""

if is_development():
if os.environ["AIRFLOW_ENV"] == "development":
print(
f"Skipping since in development mode! Would have emailed {failures_df.shape[0]} failures."
)
Expand Down
29 changes: 6 additions & 23 deletions airflow/dags/macros.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,12 @@
"""Macros for Operators"""
import os

from calitp_data.config import is_development

# To add a macro, add its definition in the appropriate section
# And then add it to the dictionary at the bottom of this file

# Is Development ======================================================


def is_development_macro():
"""Make calitp-py's is_development function available via macro"""

return is_development()


# ACTUALLY DEFINE MACROS =============================================================

# template must be added here to be accessed in dags.py
# key is alias that will be used to reference the template in DAG tasks
# value is name of function template as defined above


data_infra_macros = {
"is_development": is_development_macro,
"image_tag": lambda: "development" if is_development() else "latest",
"image_tag": lambda: "development"
if os.environ["AIRFLOW_ENV"] == "development"
else "latest",
"get_project_id": lambda: "cal-itp-data-infra-staging"
if os.environ["AIRFLOW_ENV"] == "development"
else "cal-itp-data-infra",
"env_var": os.getenv,
}
2 changes: 1 addition & 1 deletion airflow/dags/parse_elavon/elavon_to_gcs_jsonl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

import pandas as pd
import pendulum
from calitp_data.storage import get_fs
from calitp_data_infra.storage import ( # type: ignore
PartitionedGCSArtifact,
get_fs,
make_name_bq_safe,
)

Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/sync_elavon/elavon_to_gcs_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

import paramiko
import pendulum
from calitp_data.storage import get_fs
from calitp_data_infra.auth import get_secret_by_name
from calitp_data_infra.storage import get_fs

CALITP__ELAVON_SFTP_HOSTNAME = os.environ["CALITP__ELAVON_SFTP_HOSTNAME"]
CALITP__ELAVON_SFTP_PORT = os.environ["CALITP__ELAVON_SFTP_PORT"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
import pendulum
import sentry_sdk
import typer
from calitp_data.storage import get_fs
from calitp_data_infra.storage import (
GTFSFeedType,
GTFSScheduleFeedExtract,
PartitionedGCSArtifact,
ProcessingOutcome,
get_fs,
)
from tqdm import tqdm
from utils import (
Expand Down
1 change: 0 additions & 1 deletion airflow/plugins/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# flake8: noqa
from operators.airtable_to_gcs import AirtableToGCSOperator
from operators.amplitude_to_flattened_json import AmplitudeToFlattenedJSONOperator
from operators.external_table import ExternalTable
from operators.gtfs_csv_to_jsonl import GtfsGcsToJsonlOperator
from operators.gtfs_csv_to_jsonl_hourly import GtfsGcsToJsonlOperatorHourly
Expand Down
3 changes: 1 addition & 2 deletions airflow/plugins/operators/airtable_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@

import pandas as pd
import pendulum
from calitp_data.storage import get_fs
from calitp_data_infra.auth import get_secret_by_name
from calitp_data_infra.storage import make_name_bq_safe
from calitp_data_infra.storage import get_fs, make_name_bq_safe
from pyairtable import Table
from pydantic import BaseModel

Expand Down
146 changes: 0 additions & 146 deletions airflow/plugins/operators/amplitude_to_flattened_json.py

This file was deleted.

Loading
Loading