Skip to content

Commit

Permalink
Merge pull request #8 from prefeitura-rio/staging/refatoracao-repo-2
Browse files Browse the repository at this point in the history
[WIP] Refatoração Pipelines
  • Loading branch information
pixuimpou authored Feb 21, 2024
2 parents 40e8249 + 9a50b8a commit 236b144
Show file tree
Hide file tree
Showing 43 changed files with 7,825 additions and 2,851 deletions.
32 changes: 32 additions & 0 deletions pipelines/br_rj_riodejaneiro_brt_gps/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
"""
Valores constantes para pipelines br_rj_riodejaneiro_brt_gps
"""

from enum import Enum


class constants(Enum): # pylint: disable=c0103
"""
Valores constantes para pipelines br_rj_riodejaneiro_brt_gps
"""

GPS_BRT_RAW_DATASET_ID = "br_rj_riodejaneiro_brt_gps"
GPS_BRT_RAW_TABLE_ID = "registros"
GPS_BRT_DATASET_ID = "br_rj_riodejaneiro_veiculos"
GPS_BRT_TREATED_TABLE_ID = "gps_brt"
GPS_BRT_MATERIALIZE_DELAY_HOURS = 0
GPS_BRT_API_URL = "https://zn4.m2mcontrol.com.br/api/integracao/veiculos"
GPS_BRT_API_SECRET_PATH = "brt_api_v2"

GPS_BRT_MAPPING_KEYS = {
"codigo": "id_veiculo",
"linha": "servico",
"latitude": "latitude",
"longitude": "longitude",
"dataHora": "timestamp_gps",
"velocidade": "velocidade",
"sentido": "sentido",
"trajeto": "vista",
# "inicio_viagem": "timestamp_inicio_viagem",
}
30 changes: 15 additions & 15 deletions pipelines/br_rj_riodejaneiro_brt_gps/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
# isort: on
# SMTR Imports #

from pipelines.br_rj_riodejaneiro_brt_gps.constants import constants as gps_constants
from pipelines.br_rj_riodejaneiro_brt_gps.tasks import (
pre_treatment_br_rj_riodejaneiro_brt_gps,
)
from pipelines.constants import constants
from pipelines.schedules import every_hour, every_minute
from pipelines.tasks import ( # get_local_dbt_client,; setup_task,
from pipelines.utils.backup.tasks import ( # get_local_dbt_client,; setup_task,
bq_upload,
create_date_hour_partition,
create_local_partition_path,
Expand Down Expand Up @@ -58,10 +58,10 @@
)

# Get default parameters #
raw_dataset_id = Parameter("raw_dataset_id", default=constants.GPS_BRT_RAW_DATASET_ID.value)
raw_table_id = Parameter("raw_table_id", default=constants.GPS_BRT_RAW_TABLE_ID.value)
dataset_id = Parameter("dataset_id", default=constants.GPS_BRT_DATASET_ID.value)
table_id = Parameter("table_id", default=constants.GPS_BRT_TREATED_TABLE_ID.value)
raw_dataset_id = Parameter("raw_dataset_id", default=gps_constants.GPS_BRT_RAW_DATASET_ID.value)
raw_table_id = Parameter("raw_table_id", default=gps_constants.GPS_BRT_RAW_TABLE_ID.value)
dataset_id = Parameter("dataset_id", default=gps_constants.GPS_BRT_DATASET_ID.value)
table_id = Parameter("table_id", default=gps_constants.GPS_BRT_TREATED_TABLE_ID.value)
rebuild = Parameter("rebuild", False)

LABELS = get_current_flow_labels()
Expand All @@ -80,7 +80,7 @@
raw_table_id=raw_table_id,
table_run_datetime_column_name="timestamp_gps",
mode=MODE,
delay_hours=constants.GPS_BRT_MATERIALIZE_DELAY_HOURS.value,
delay_hours=gps_constants.GPS_BRT_MATERIALIZE_DELAY_HOURS.value,
)
dataset_sha = fetch_dataset_sha(
dataset_id=dataset_id,
Expand Down Expand Up @@ -144,16 +144,16 @@
filename = parse_timestamp_to_string(timestamp)

filepath = create_local_partition_path(
dataset_id=constants.GPS_BRT_RAW_DATASET_ID.value,
table_id=constants.GPS_BRT_RAW_TABLE_ID.value,
dataset_id=gps_constants.GPS_BRT_RAW_DATASET_ID.value,
table_id=gps_constants.GPS_BRT_RAW_TABLE_ID.value,
filename=filename,
partitions=partitions,
)
# EXTRACT

raw_status = get_raw(
url=constants.GPS_BRT_API_URL.value,
headers=constants.GPS_BRT_API_SECRET_PATH.value,
url=gps_constants.GPS_BRT_API_URL.value,
headers=gps_constants.GPS_BRT_API_SECRET_PATH.value,
)

raw_filepath = save_raw_local(status=raw_status, file_path=filepath)
Expand All @@ -165,16 +165,16 @@
treated_filepath = save_treated_local(status=treated_status, file_path=filepath)
# LOAD
error = bq_upload(
dataset_id=constants.GPS_BRT_RAW_DATASET_ID.value,
table_id=constants.GPS_BRT_RAW_TABLE_ID.value,
dataset_id=gps_constants.GPS_BRT_RAW_DATASET_ID.value,
table_id=gps_constants.GPS_BRT_RAW_TABLE_ID.value,
filepath=treated_filepath,
raw_filepath=raw_filepath,
partitions=partitions,
status=treated_status,
)
upload_logs_to_bq(
dataset_id=constants.GPS_BRT_RAW_DATASET_ID.value,
parent_table_id=constants.GPS_BRT_RAW_TABLE_ID.value,
dataset_id=gps_constants.GPS_BRT_RAW_DATASET_ID.value,
parent_table_id=gps_constants.GPS_BRT_RAW_TABLE_ID.value,
timestamp=timestamp,
error=error,
)
Expand Down
9 changes: 7 additions & 2 deletions pipelines/br_rj_riodejaneiro_brt_gps/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
# SMTR Imports #

from pipelines.constants import constants
from pipelines.utils.utils import log_critical, map_dict_keys
from pipelines.utils.backup.utils import log_critical, map_dict_keys


from pipelines.br_rj_riodejaneiro_brt_gps.constants import constants as gps_constants

# Tasks #

Expand Down Expand Up @@ -58,7 +61,9 @@ def pre_treatment_br_rj_riodejaneiro_brt_gps(status: dict, timestamp):
df = pd.DataFrame(columns=columns) # pylint: disable=c0103

# map_dict_keys change data keys to match project data structure
df["content"] = [map_dict_keys(piece, constants.GPS_BRT_MAPPING_KEYS.value) for piece in data]
df["content"] = [
map_dict_keys(piece, gps_constants.GPS_BRT_MAPPING_KEYS.value) for piece in data
]
df[key_column] = [piece[key_column] for piece in data]
df["timestamp_gps"] = [piece["timestamp_gps"] for piece in data]
df["timestamp_captura"] = timestamp
Expand Down
Empty file added pipelines/capture/__init__.py
Empty file.
Empty file.
Loading

0 comments on commit 236b144

Please sign in to comment.