Skip to content

Commit

Permalink
Merge pull request #25 from prefeitura-rio/staging/alertario_gypscie
Browse files Browse the repository at this point in the history
changing scheduler
  • Loading branch information
patriciacatandi authored Nov 13, 2024
2 parents 5dbd771 + bf16bde commit 9768425
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 63 deletions.
56 changes: 31 additions & 25 deletions pipelines/meteorologia/precipitacao_alertario/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Flows for precipitacao_alertario.
"""
from datetime import timedelta
from threading import Thread

from prefect import Parameter, case # pylint: disable=E0611, E0401
from prefect.run_configs import KubernetesRun # pylint: disable=E0611, E0401
Expand Down Expand Up @@ -43,11 +44,8 @@
# from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.custom import wait_for_flow_run_with_timeout

# from pipelines.utils.dump_db.constants import constants as dump_db_constants
from pipelines.utils.dump_to_gcs.constants import constants as dump_to_gcs_constants

# preprocessing imports
from pipelines.utils.gypscie.tasks import ( # pylint: disable=E0611, E0401
from pipelines.utils.gypscie.tasks import ( # pylint: disable=E0611, E0401; timeout_flow,; monitor_flow,
access_api,
add_caracterization_columns_on_dfr,
convert_columns_type,
Expand All @@ -63,6 +61,10 @@
unzip_files,
)

# from pipelines.utils.dump_db.constants import constants as dump_db_constants
# from pipelines.utils.dump_to_gcs.constants import constants as dump_to_gcs_constants


wait_for_flow_run_with_5min_timeout = wait_for_flow_run_with_timeout(timeout=timedelta(minutes=5))

with Flow(
Expand All @@ -77,28 +79,28 @@
DUMP_MODE = "append"

# Materialization parameters
MATERIALIZE_AFTER_DUMP_OLD_API = Parameter(
"materialize_after_dump_old_api", default=False, required=False
)
MATERIALIZE_TO_DATARIO_OLD_API = Parameter(
"materialize_to_datario_old_api", default=False, required=False
)
# MATERIALIZE_AFTER_DUMP_OLD_API = Parameter(
# "materialize_after_dump_old_api", default=False, required=False
# )
# MATERIALIZE_TO_DATARIO_OLD_API = Parameter(
# "materialize_to_datario_old_api", default=False, required=False
# )
MATERIALIZE_AFTER_DUMP = Parameter("materialize_after_dump", default=False, required=False)
MATERIALIZE_TO_DATARIO = Parameter("materialize_to_datario", default=False, required=False)
# MATERIALIZE_TO_DATARIO = Parameter("materialize_to_datario", default=False, required=False)
MATERIALIZATION_MODE = Parameter("mode", default="dev", required=False)
TRIGGER_RAIN_DASHBOARD_UPDATE = Parameter(
"trigger_rain_dashboard_update", default=False, required=False
)
PREFECT_PROJECT = Parameter("prefect_project", default="staging", required=False)

# Dump to GCS after? Should only dump to GCS if materializing to datario
DUMP_TO_GCS = Parameter("dump_to_gcs", default=False, required=False)
# DUMP_TO_GCS = Parameter("dump_to_gcs", default=False, required=False)

MAXIMUM_BYTES_PROCESSED = Parameter(
"maximum_bytes_processed",
required=False,
default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value,
)
# MAXIMUM_BYTES_PROCESSED = Parameter(
# "maximum_bytes_processed",
# required=False,
# default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value,
# )

# Preprocessing gypscie parameters
preprocessing_gypscie = Parameter("preprocessing_gypscie", default=False, required=False)
Expand All @@ -107,11 +109,11 @@
environment_id = Parameter("environment_id", default=1, required=False)
domain_id = Parameter("domain_id", default=1, required=False)
project_id = Parameter("project_id", default=1, required=False)
project_name = Parameter("project_name", default="rionowcast_precipitation", required=False)
treatment_version = Parameter("treatment_version", default=1, required=False)
# gypscie_project_name = Parameter("project_name", default="rionowcast_precipitation", required=False) # noqa: E501
# treatment_version = Parameter("treatment_version", default=1, required=False)

# Gypscie processor parameters
processor_name = Parameter("processor_name", default="etl_alertario22", required=True)
processor_name = Parameter("processor_name", default="etl_alertario22", required=False)
dataset_processor_id = Parameter("dataset_processor_id", default=43, required=False) # mudar

load_data_function_id = Parameter("load_data_function_id", default=53, required=False)
Expand Down Expand Up @@ -141,13 +143,18 @@
source = Parameter("source", default="alertario", required=False)

# Dataset path, if it was saved on ETL flow or it will be None
dataset_path = Parameter("dataset_path", default=None, required=False) # dataset_path
# dataset_path = Parameter("dataset_path", default=None, required=False) # dataset_path
model_version = Parameter("model_version", default=1, required=False)

#########################
# Start alertario flow #
#########################

# timeout_flow(timeout_seconds=300)
# Inicia o monitoramento em um novo thread
# monitor_thread = Thread(
# target=monitor_flow, args=(300, cor_meteorologia_precipitacao_alertario)
# )
# monitor_thread.start()
dfr_pluviometric, dfr_meteorological = download_data()
(dfr_pluviometric, empty_data_pluviometric,) = treat_pluviometer_and_meteorological_data(
dfr=dfr_pluviometric,
Expand All @@ -166,7 +173,7 @@
path_pluviometric, full_path_pluviometric = save_data(
dfr_pluviometric,
data_name="pluviometric",
treatment_version=treatment_version,
# treatment_version=treatment_version,
wait=empty_data_pluviometric,
)
# Create table in BigQuery
Expand Down Expand Up @@ -447,7 +454,6 @@
#####################################
# Start preprocessing gypscie flow #
#####################################

with case(empty_data_pluviometric, False):
with case(preprocessing_gypscie, True):
api = access_api()
Expand Down Expand Up @@ -483,7 +489,7 @@
project_id=project_id,
rain_gauge_data_id=register_dataset_response["id"],
rain_gauge_metadata_path=rain_gauge_metadata_path,
load_data_funtion_id=load_data_function_id,
load_data_function_id=load_data_function_id,
parse_date_time_function_id=parse_date_time_function_id,
drop_duplicates_function_id=drop_duplicates_function_id,
replace_inconsistent_values_function_id=replace_inconsistent_values_function_id,
Expand Down
48 changes: 23 additions & 25 deletions pipelines/meteorologia/precipitacao_alertario/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,31 @@
parameter_defaults={
"trigger_rain_dashboard_update": True,
"materialize_after_dump_old_api": True,
"materialize_to_datario_old_api": True,
"materialize_after_dump": True,
"materialize_to_datario": False,
"mode": "prod",
"dump_to_gcs": False,
"maximum_bytes_processed": None,
"preprocessing_gypscie": True,
"workflow_id": 41,
"environment_id": 1,
"domain_id": 1,
"project_id": 1,
"project_name": "rionowcast_precipitation",
"treatment_version": 1,
"processor_name": "etl_alertario22",
"dataset_processor_id": 43,
"load_data_function_id": 53,
"parse_date_time_function_id": 54,
"drop_duplicates_function_id": 55,
"replace_inconsistent_values_function_id": 56,
"add_lat_lon_function_id": 57,
"save_data_function_id": 58,
"rain_gauge_metadata_path": 227,
"dataset_id_previsao_chuva": "clima_previsao_chuva",
"table_id_previsao_chuva": "preprocessamento_pluviometro_alertario",
"station_type": "rain_gauge",
"source": "alertario",
"model_version": 1,
# "dump_to_gcs": False,
# "maximum_bytes_processed": None,
"preprocessing_gypscie": False,
# "workflow_id": 41,
# "environment_id": 1,
# "domain_id": 1,
# "project_id": 1,
# "project_name": "rionowcast_precipitation",
# "treatment_version": 1,
# "processor_name": "etl_alertario22",
# "dataset_processor_id": 43,
# "load_data_function_id": 53,
# "parse_date_time_function_id": 54,
# "drop_duplicates_function_id": 55,
# "replace_inconsistent_values_function_id": 56,
# "add_lat_lon_function_id": 57,
# "save_data_function_id": 58,
# "rain_gauge_metadata_path": 227,
# "dataset_id_previsao_chuva": "clima_previsao_chuva",
# "table_id_previsao_chuva": "preprocessamento_pluviometro_alertario",
# "station_type": "rain_gauge",
# "source": "alertario",
# "model_version": 1,
},
),
]
Expand Down
15 changes: 7 additions & 8 deletions pipelines/meteorologia/precipitacao_alertario/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,31 +185,30 @@ def save_data(
columns: str = None,
treatment_version: int = None,
data_type: str = "csv",
preffix: str = None,
suffix: bool = True,
rename: str = None,
wait=None, # pylint: disable=unused-argument
) -> Tuple[Union[str, Path], Union[str, Path]]:
"""
Salvar dfr tratados em csv para conseguir subir pro GCP
"""

treatment_version = str(treatment_version) + "_" if treatment_version else ""

prepath = Path(f"/tmp/precipitacao_alertario/{data_name}")
prepath.mkdir(parents=True, exist_ok=True)

partition_column = "data_medicao"
treatment_version = str(treatment_version) + "_" if treatment_version else ""
suffix = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M") if suffix else None
if columns:
dfr = dfr[columns]

# Remove partition columns if they already exist
new_partition_columns = ["ano_particao", "mes_particao", "data_particao"]
dfr = dfr.drop(columns=[col for col in new_partition_columns if col in dfr.columns])

log(f"Dataframe for {data_name} before partitions {dfr.iloc[0]}")
dataframe, partitions = parse_date_columns(dfr, partition_column)
log(f"Dataframe for {data_name} after partitions {dataframe.iloc[0]}")

suffix = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M") if suffix else None
if columns:
dataframe = dataframe[columns + new_partition_columns]

full_paths = to_partitions(
data=dataframe,
partition_columns=partitions,
Expand Down
2 changes: 1 addition & 1 deletion pipelines/meteorologia/radar/mendanha/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
environment_id = Parameter("environment_id", default=1, required=False)
domain_id = Parameter("domain_id", default=1, required=False)
project_id = Parameter("project_id", default=1, required=False)
project_name = Parameter("project_name", default="rionowcast_precipitation", required=False)
# project_name = Parameter("project_name", default="rionowcast_precipitation", required=False)

# Gypscie processor parameters
processor_name = Parameter("processor_name", default="etl_alertario22", required=True)
Expand Down
48 changes: 44 additions & 4 deletions pipelines/utils/gypscie/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import pandas as pd
from basedosdados import Base # pylint: disable=E0611, E0401
from google.cloud import bigquery # pylint: disable=E0611, E0401
from prefect import task # pylint: disable=E0611, E0401
from prefect import context, task # pylint: disable=E0611, E0401
from prefect.engine.signals import ENDRUN # pylint: disable=E0611, E0401
from prefect.engine.state import Failed # pylint: disable=E0611, E0401
from prefect.engine.state import Failed, Skipped # pylint: disable=E0611, E0401

# pylint: disable=E0611, E0401
from prefeitura_rio.pipelines_utils.infisical import get_secret
Expand All @@ -33,6 +33,7 @@


# noqa E302, E303
# @task(timeout=300)
@task()
def access_api():
"""# noqa E303
Expand Down Expand Up @@ -346,7 +347,7 @@ def get_dataflow_alertario_params( # pylint: disable=too-many-arguments
project_id,
rain_gauge_data_id,
rain_gauge_metadata_path,
load_data_funtion_id,
load_data_function_id,
parse_date_time_function_id,
drop_duplicates_function_id,
replace_inconsistent_values_function_id,
Expand Down Expand Up @@ -392,7 +393,7 @@ def get_dataflow_alertario_params( # pylint: disable=too-many-arguments
"environment_id": environment_id,
"parameters": [
{
"function_id": load_data_funtion_id,
"function_id": load_data_function_id,
"params": {
"rain_gauge_data_path": rain_gauge_data_id,
"rain_gauge_metadata_path": rain_gauge_metadata_path,
Expand Down Expand Up @@ -836,3 +837,42 @@ def rename_files(
new_paths.append(savepath)
print(f"Renamed file paths: {new_paths}")
return new_paths


@task
def timeout_flow(
timeout_seconds: int = 600,
wait=None, # pylint: disable=unused-argument
):
"""
Stop flow if it exceeds timeout_seconds
"""
start_time = datetime.datetime.now(datetime.timezone.utc)
while True:
elapsed_time = datetime.datetime.now(datetime.timezone.utc) - start_time
if elapsed_time > datetime.timedelta(seconds=timeout_seconds):
stop_message = f"Time exceeded. Stop flow after {timeout_seconds} seconds"
log(stop_message)
task_state = Skipped(stop_message)
raise ENDRUN(state=task_state)
sleep(30)


def monitor_flow(timeout_seconds, flow_):
"""
Tarefa de monitoramento paralela para interromper o fluxo
se o tempo total ultrapassar o limite.
"""
start_time = datetime.now(datetime.timezone.utc)
while True:
elapsed_time = datetime.now(datetime.timezone.utc) - start_time
if elapsed_time > datetime.timedelta(seconds=timeout_seconds):
log(f"elapsed_time {elapsed_time}")
logger = context.get("logger")
stop_message = (
f"Tempo limite de {timeout_seconds} segundos excedido. Encerrando o fluxo."
)
logger.warning(stop_message)
flow_.set_reference_tasks([Failed(stop_message)]) # Define o estado de falha do fluxo
return
sleep(10) # Verifica o tempo a cada 10 segundos

0 comments on commit 9768425

Please sign in to comment.