Skip to content

Commit

Permalink
Merge branch 'main' into pre-commit-ci-update-config
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Sep 23, 2024
2 parents b9e2913 + c732c7a commit bf7628d
Show file tree
Hide file tree
Showing 15 changed files with 721 additions and 376 deletions.
1 change: 1 addition & 0 deletions pipelines/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class constants(Enum):
},
}

# Infisical
INFISICAL_URL = "URL"
INFISICAL_USERNAME = "USERNAME"
INFISICAL_PASSWORD = "PASSWORD"
Expand Down
15 changes: 7 additions & 8 deletions pipelines/meteorologia/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
# -*- coding: utf-8 -*-
# from pipelines.meteorologia.meteorologia_inmet.flows import * # noqa
# from pipelines.meteorologia.meteorologia_redemet.flows import * # noqa
# from pipelines.meteorologia.precipitacao_alertario.flows import * # noqa
# from pipelines.meteorologia.precipitacao_cemaden.flows import * # noqa
# from pipelines.meteorologia.precipitacao_inea.flows import * # noqa
# from pipelines.meteorologia.precipitacao_websirene.flows import * # noqa

# from pipelines.meteorologia.satelite.flows import * # noqa
from pipelines.meteorologia.meteorologia_inmet.flows import * # noqa
from pipelines.meteorologia.meteorologia_redemet.flows import * # noqa
from pipelines.meteorologia.precipitacao_alertario.flows import * # noqa
from pipelines.meteorologia.precipitacao_cemaden.flows import * # noqa
from pipelines.meteorologia.precipitacao_inea.flows import * # noqa
from pipelines.meteorologia.precipitacao_websirene.flows import * # noqa
from pipelines.meteorologia.radar.mendanha.flows import * # noqa
from pipelines.meteorologia.satelite.flows import * # noqa
2 changes: 0 additions & 2 deletions pipelines/meteorologia/radar/mendanha/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@
radar_files = download_files_storage(
bucket_name=BUCKET_NAME,
files_to_download=files_on_storage_list,
# destination_path=f"{BASE_PATH}radar_data/",
destination_path="temp/",
)
combined_radar = combine_radar_files(radar_files)
Expand Down Expand Up @@ -148,7 +147,6 @@
# saved_with_background_img_path = save_images_to_local(
# {formatted_time: img_bytes_with_backgroud}
# )
# # save_images_to_local.set_upstream(formatted_time)
# destination_blob_name, source_file_name = get_storage_destination(
# formatted_time, saved_with_background_img_path
# )
Expand Down
2 changes: 1 addition & 1 deletion pipelines/meteorologia/radar/mendanha/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def get_radar_parameters(radar) -> Union[Tuple, Tuple]:
grid_limits = (z_limits, y_limits, x_limits)

# Number of vertical levels
z_levels = 21 # Você pode ajustar conforme necessário
z_levels = 20 # Você pode ajustar conforme necessário

# Resolution in x and y (number of points on horizontal grade)
x_points = int((x_limits[1] - x_limits[0]) / radar.range["meters_between_gates"])
Expand Down
15 changes: 0 additions & 15 deletions pipelines/meteorologia/satelite/constants.py

This file was deleted.

187 changes: 132 additions & 55 deletions pipelines/meteorologia/satelite/flows.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
# -*- coding: utf-8 -*-
# pylint: disable=C0103
# flake8: noqa: E501
"""
Flows for emd
Flows for emd.
"""
from copy import deepcopy

from prefect import Parameter, case
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefeitura_rio.pipelines_utils.custom import Flow # pylint: disable=E0611, E0401
from prefeitura_rio.pipelines_utils.state_handlers import (
handler_initialize_sentry,
handler_inject_bd_credentials,
)

# from pipelines.utils.tasks import (
# create_table_and_upload_to_gcs,
# get_current_flow_labels,
# )
from prefeitura_rio.pipelines_utils.tasks import ( # pylint: disable=E0611, E0401
create_table_and_upload_to_gcs,
get_now_datetime,
task_run_dbt_model_task,
)

from pipelines.constants import constants
from pipelines.meteorologia.satelite.constants import constants as satelite_constants
from pipelines.meteorologia.satelite.schedules import (
aod,
cmip,
Expand All @@ -22,51 +36,64 @@
sst,
tpw,
)
from pipelines.meteorologia.satelite.tasks import (
create_image_and_upload_to_api,

# from pipelines.utils.constants import constants as utils_constants
from pipelines.meteorologia.satelite.tasks import ( # create_image,
download,
generate_point_value,
get_dates,
save_data,
slice_data,
tratar_dados,
)
from pipelines.tasks import get_on_redis, save_on_redis
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
from pipelines.utils.tasks import (
create_table_and_upload_to_gcs,
get_current_flow_labels,
from pipelines.tasks import ( # pylint: disable=E0611, E0401
task_build_redis_hash,
task_create_partitions,
task_get_redis_client,
task_get_redis_output,
task_save_on_redis,
)

# from prefect.tasks.prefect import create_flow_run, wait_for_flow_run


with Flow(
name="COR: Meteorologia - Satelite GOES 16",
code_owners=[
"paty",
state_handlers=[
handler_initialize_sentry,
handler_inject_bd_credentials,
],
parallelism=10,
skip_if_running=False,
) as cor_meteorologia_goes16:

# Materialization parameters
materialize_after_dump = Parameter("materialize_after_dump", default=False, required=False)
materialize_to_datario = Parameter("materialize_to_datario", default=False, required=False)
materialization_mode = Parameter("mode", default="dev", required=False)
# materialize_to_datario = Parameter("materialize_to_datario", default=False, required=False)
# materialization_mode = Parameter("mode", default="dev", required=False)

# Other parameters
dataset_id = satelite_constants.DATASET_ID.value
dataset_id = mode_redis = Parameter("dataset_id", default="clima_satelite", required=False)
band = Parameter("band", default=None, required=False)()
product = Parameter("product", default=None, required=False)()
table_id = Parameter("table_id", default=None, required=False)()
dump_mode = "append"
mode_redis = Parameter("mode_redis", default="prod", required=False)
ref_filename = Parameter("ref_filename", default=None, required=False)
current_time = Parameter("current_time", default=None, required=False)
create_image = Parameter("create_image", default=False, required=False)
# create_image = Parameter("create_image", default=False, required=False)
create_point_value = Parameter("create_point_value", default=False, required=False)

# Starting tasks
current_time = get_dates(current_time, product)

date_hour_info = slice_data(current_time=current_time, ref_filename=ref_filename)

# # Get filenames that were already treated on redis
redis_files = get_on_redis(dataset_id, table_id, mode=mode_redis)
# redis_files = get_on_redis(dataset_id, table_id, mode=mode_redis)
redis_client = task_get_redis_client(infisical_secrets_path="/redis")
redis_key = task_build_redis_hash(dataset_id, table_id, mode=mode_redis)
redis_files = task_get_redis_output(redis_client, redis_key=redis_key)
# redis_files = []

# Download raw data from API
Expand All @@ -85,51 +112,85 @@
path, output_filepath = save_data(info=info, mode_redis=mode_redis)

# Create table in BigQuery
upload_table = create_table_and_upload_to_gcs(
create_table = create_table_and_upload_to_gcs(
data_path=path,
dataset_id=dataset_id,
table_id=table_id,
dump_mode=dump_mode,
wait=path,
biglake_table=False,
)

# Save new filenames on redis
save_on_redis(
dataset_id,
table_id,
mode_redis,
redis_files_updated,
task_save_on_redis(
redis_client=redis_client,
values=redis_files_updated,
redis_key=redis_key,
keep_last=50,
wait=path,
)

with case(create_image, True):
create_image_and_upload_to_api(info, output_filepath)
# with case(create_image, True):
# create_image_and_upload_to_api(info, output_filepath)

with case(create_point_value, True):
now_datetime = get_now_datetime()
df_point_values = generate_point_value(info, output_filepath)
point_values_path = task_create_partitions(
df_point_values,
partition_date_column="data_medicao",
# partition_columns=["ano_particao", "mes_particao", "data_particao"],
savepath="metricas_geoespaciais_goes16",
suffix=now_datetime,
)
create_table_point_value = create_table_and_upload_to_gcs(
data_path=point_values_path,
dataset_id=dataset_id,
table_id="metricas_geoespaciais_goes16",
dump_mode=dump_mode,
biglake_table=False,
)

# Trigger DBT flow run
with case(materialize_after_dump, True):
current_flow_labels = get_current_flow_labels()

materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id,
"mode": materialization_mode,
"materialize_to_datario": materialize_to_datario,
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id}",
run_dbt = task_run_dbt_model_task(
dataset_id=dataset_id,
table_id=table_id,
# mode=materialization_mode,
# materialize_to_datario=materialize_to_datario,
)
run_dbt.set_upstream(create_table)

materialization_flow.set_upstream(upload_table)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
run_dbt_point_value = task_run_dbt_model_task(
dataset_id=dataset_id,
table_id="metricas_geoespaciais_goes16",
# mode=materialization_mode,
# materialize_to_datario=materialize_to_datario,
)
run_dbt_point_value.set_upstream(create_table_point_value)

# current_flow_labels = get_current_flow_labels()

# materialization_flow = create_flow_run(
# flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
# project_name=constants.PREFECT_DEFAULT_PROJECT.value,
# parameters={
# "dataset_id": dataset_id,
# "table_id": table_id,
# "mode": materialization_mode,
# "materialize_to_datario": materialize_to_datario,
# },
# labels=current_flow_labels,
# run_name=f"Materialize {dataset_id}.{table_id}",
# )

# materialization_flow.set_upstream(upload_table)

# wait_for_materialization = wait_for_flow_run(
# materialization_flow,
# stream_states=True,
# stream_logs=True,
# raise_final_state=True,
# )


# para rodar na cloud
Expand All @@ -139,7 +200,9 @@
# labels=[constants.RJ_COR_AGENT_LABEL.value],
# )
cor_meteorologia_goes16_rrqpe = deepcopy(cor_meteorologia_goes16)
cor_meteorologia_goes16_rrqpe.name = "COR: Meteorologia - Satelite GOES 16 - RRQPE"
cor_meteorologia_goes16_rrqpe.name = (
"COR: Meteorologia - Satelite GOES 16 - RRQPE - Taxa de precipitação"
)
cor_meteorologia_goes16_rrqpe.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
cor_meteorologia_goes16_rrqpe.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
Expand All @@ -148,7 +211,9 @@
cor_meteorologia_goes16_rrqpe.schedule = rrqpe

cor_meteorologia_goes16_tpw = deepcopy(cor_meteorologia_goes16)
cor_meteorologia_goes16_tpw.name = "COR: Meteorologia - Satelite GOES 16 - TPW"
cor_meteorologia_goes16_tpw.name = (
"COR: Meteorologia - Satelite GOES 16 - TPW - Quantidade de água precipitável"
)
cor_meteorologia_goes16_tpw.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
cor_meteorologia_goes16_tpw.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
Expand All @@ -157,7 +222,9 @@
cor_meteorologia_goes16_tpw.schedule = tpw

cor_meteorologia_goes16_cmip = deepcopy(cor_meteorologia_goes16)
cor_meteorologia_goes16_cmip.name = "COR: Meteorologia - Satelite GOES 16 - CMIP"
cor_meteorologia_goes16_cmip.name = (
"COR: Meteorologia - Satelite GOES 16 - CMIP - Infravermelho longo banda 13"
)
cor_meteorologia_goes16_cmip.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
cor_meteorologia_goes16_cmip.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
Expand All @@ -166,7 +233,9 @@
cor_meteorologia_goes16_cmip.schedule = cmip

cor_meteorologia_goes16_mcmip = deepcopy(cor_meteorologia_goes16)
cor_meteorologia_goes16_mcmip.name = "COR: Meteorologia - Satelite GOES 16 - MCMIP"
cor_meteorologia_goes16_mcmip.name = (
"COR: Meteorologia - Satelite GOES 16 - MCMIP - Nuvem e umidade"
)
cor_meteorologia_goes16_mcmip.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
cor_meteorologia_goes16_mcmip.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
Expand All @@ -175,7 +244,9 @@
cor_meteorologia_goes16_mcmip.schedule = mcmip

cor_meteorologia_goes16_dsi = deepcopy(cor_meteorologia_goes16)
cor_meteorologia_goes16_dsi.name = "COR: Meteorologia - Satelite GOES 16 - DSI"
cor_meteorologia_goes16_dsi.name = (
"COR: Meteorologia - Satelite GOES 16 - DSI - Índices de estabilidade da atmosfera"
)
cor_meteorologia_goes16_dsi.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
cor_meteorologia_goes16_dsi.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
Expand All @@ -184,7 +255,9 @@
cor_meteorologia_goes16_dsi.schedule = dsi

cor_meteorologia_goes16_lst = deepcopy(cor_meteorologia_goes16)
cor_meteorologia_goes16_lst.name = "COR: Meteorologia - Satelite GOES 16 - LST"
cor_meteorologia_goes16_lst.name = (
"COR: Meteorologia - Satelite GOES 16 - LST - Temperatura da superfície da terra"
)
cor_meteorologia_goes16_lst.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
cor_meteorologia_goes16_lst.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
Expand All @@ -193,7 +266,9 @@
cor_meteorologia_goes16_lst.schedule = lst

cor_meteorologia_goes16_sst = deepcopy(cor_meteorologia_goes16)
cor_meteorologia_goes16_sst.name = "COR: Meteorologia - Satelite GOES 16 - SST"
cor_meteorologia_goes16_sst.name = (
"COR: Meteorologia - Satelite GOES 16 - SST - Temperatura da superfície do oceano"
)
cor_meteorologia_goes16_sst.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
cor_meteorologia_goes16_sst.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
Expand All @@ -202,7 +277,9 @@
cor_meteorologia_goes16_sst.schedule = sst

cor_meteorologia_goes16_aod = deepcopy(cor_meteorologia_goes16)
cor_meteorologia_goes16_aod.name = "COR: Meteorologia - Satelite GOES 16 - AOD"
cor_meteorologia_goes16_aod.name = (
"COR: Meteorologia - Satelite GOES 16 - AOD - Profundidade óptica aerossol"
)
cor_meteorologia_goes16_aod.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
cor_meteorologia_goes16_aod.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
Expand Down
3 changes: 1 addition & 2 deletions pipelines/meteorologia/satelite/remap.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
import netCDF4 as nc
import numpy as np
from osgeo import gdal, osr # pylint: disable=E0401

from pipelines.utils.utils import log
from prefeitura_rio.pipelines_utils.logging import log


def extract_resolution(input_string: str):
Expand Down
Loading

0 comments on commit bf7628d

Please sign in to comment.