Skip to content

Commit 5f31f0b

Browse files
authored
Merge branch 'master' into pre-commit-ci-update-config
2 parents 21cf6eb + 3e9b886 commit 5f31f0b

File tree

7 files changed

+289
-109
lines changed

7 files changed

+289
-109
lines changed

pipelines/rj_cor/meteorologia/precipitacao_alertario/constants.py

Lines changed: 141 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
class constants(Enum): # pylint: disable=c0103
1111
"""
1212
Constant values for the precipitacao_alertario project
13+
The constants for actual values are on rain_dashboard_constants file
1314
"""
1415

1516
RAIN_DASHBOARD_LAST_2H_FLOW_SCHEDULE_PARAMETERS = {
@@ -18,12 +19,12 @@ class constants(Enum): # pylint: disable=c0103
1819
"query_data": """
1920
WITH
2021
last_update_date AS (
21-
SELECT
22-
CAST(MAX(data_particao) AS DATETIME) AS last_update
23-
FROM `rj-cor.clima_pluviometro.taxa_precipitacao_alertario`
24-
WHERE data_particao >= DATE_SUB(CURRENT_DATETIME('America/Sao_Paulo'), INTERVAL 2 DAY)
25-
),
26-
alertario AS ( -- seleciona as últimas 2h de medição antes da última atualização
22+
SELECT
23+
CAST(MAX(data_particao) AS DATETIME) AS last_update
24+
FROM `rj-cor.clima_pluviometro.taxa_precipitacao_alertario`
25+
WHERE data_particao >= DATE_SUB(CURRENT_DATETIME('America/Sao_Paulo'), INTERVAL 2 DAY)
26+
),
27+
alertario AS ( -- seleciona as últimas 2h de medição do alertario antes da última atualização
2728
SELECT
2829
id_estacao,
2930
acumulado_chuva_15_min,
@@ -36,39 +37,97 @@ class constants(Enum): # pylint: disable=c0103
3637
AND CAST(CONCAT(data_particao, " ", horario) AS DATETIME) >= DATE_SUB(lup.last_update, INTERVAL 2 HOUR)
3738
),
3839
39-
last_measurements AS (-- soma a quantidade chuva das últimas 2h
40-
SELECT
41-
a.id_estacao,
42-
"alertario" AS sistema,
43-
MAX(a.data_update) AS data_update,
44-
SUM(a.acumulado_chuva_15_min) AS acumulado_chuva_15_min,
45-
FROM alertario a
46-
GROUP BY a.id_estacao, sistema
40+
websirene AS ( -- seleciona as últimas 2h de medição do websirene antes da última atualização
41+
SELECT
42+
id_estacao,
43+
acumulado_chuva_15_min,
44+
CURRENT_DATE('America/Sao_Paulo') as data,
45+
data_particao,
46+
DATETIME(CONCAT(data_particao," ", horario)) AS data_update,
47+
FROM `rj-cor.clima_pluviometro.taxa_precipitacao_websirene`
48+
INNER JOIN last_update_date lup ON 1=1
49+
WHERE data_particao >= DATE_SUB(CURRENT_DATE('America/Sao_Paulo'), INTERVAL 2 DAY)
50+
AND CAST(CONCAT(data_particao, " ", horario) AS DATETIME) >= DATE_SUB(lup.last_update, INTERVAL 2 HOUR)
51+
),
52+
53+
cemaden AS ( -- seleciona as últimas 2h de medição do cemaden antes da última atualização
54+
SELECT
55+
id_estacao,
56+
acumulado_chuva_10_min acumulado_chuva_15_min,
57+
CURRENT_DATE('America/Sao_Paulo') as data,
58+
data_particao,
59+
DATETIME(data_medicao) AS data_update,
60+
FROM `rj-cor.clima_pluviometro.taxa_precipitacao_cemaden`
61+
INNER JOIN last_update_date lup ON 1=1
62+
WHERE data_particao >= DATE_SUB(CURRENT_DATE('America/Sao_Paulo'), INTERVAL 2 DAY)
63+
AND CAST(data_medicao AS DATETIME) >= DATE_SUB(lup.last_update, INTERVAL 2 HOUR)
64+
),
65+
66+
last_measurements AS (-- soma a quantidade chuva das últimas 2h e concatena medições do alertario, cemaden e websirene
67+
(SELECT
68+
id_estacao,
69+
"alertario" AS sistema,
70+
MAX(data_update) AS data_update,
71+
SUM(acumulado_chuva_15_min) AS acumulado_chuva_15_min,
72+
FROM alertario
73+
GROUP BY id_estacao, sistema)
74+
UNION ALL
75+
(SELECT
76+
id_estacao,
77+
"websirene" AS sistema,
78+
MAX(data_update) AS data_update,
79+
SUM(acumulado_chuva_15_min) AS acumulado_chuva_15_min,
80+
FROM websirene
81+
GROUP BY id_estacao, sistema)
82+
UNION ALL
83+
(SELECT
84+
id_estacao,
85+
"cemaden" AS sistema,
86+
MAX(data_update) AS data_update,
87+
SUM(acumulado_chuva_15_min) AS acumulado_chuva_15_min,
88+
FROM cemaden
89+
GROUP BY id_estacao, sistema)
4790
),
4891
4992
h3_chuvas AS ( -- calcula qnt de chuva para cada h3
50-
SELECT
51-
h3.*,
52-
lm.id_estacao,
53-
lm.acumulado_chuva_15_min,
54-
lm.acumulado_chuva_15_min/power(h3.dist,5) AS p1_15min,
55-
1/power(h3.dist,5) AS inv_dist
56-
FROM (
57-
WITH centroid_h3 AS (
58-
SELECT
59-
*,
60-
ST_CENTROID(geometry) AS geom
61-
FROM `rj-cor.dados_mestres.h3_grid_res8`
62-
),
93+
SELECT
94+
h3.*,
95+
lm.id_estacao,
96+
lm.acumulado_chuva_15_min,
97+
lm.acumulado_chuva_15_min/power(h3.dist,5) AS p1_15min,
98+
1/power(h3.dist,5) AS inv_dist
99+
FROM (
100+
WITH centroid_h3 AS (
101+
SELECT
102+
*,
103+
ST_CENTROID(geometry) AS geom
104+
FROM `rj-cor.dados_mestres.h3_grid_res8`
105+
),
63106
64-
estacoes_pluviometricas AS (
65-
SELECT
66-
id_estacao AS id,
67-
estacao,
68-
"alertario" AS sistema,
69-
ST_GEOGPOINT(CAST(longitude AS FLOAT64),
70-
CAST(latitude AS FLOAT64)) AS geom
71-
FROM `rj-cor.clima_pluviometro.estacoes_alertario`
107+
estacoes_pluviometricas AS (
108+
(SELECT
109+
id_estacao AS id,
110+
estacao,
111+
"alertario" AS sistema,
112+
ST_GEOGPOINT(CAST(longitude AS FLOAT64),
113+
CAST(latitude AS FLOAT64)) AS geom
114+
FROM `rj-cor.clima_pluviometro.estacoes_alertario`)
115+
UNION ALL
116+
(SELECT
117+
id_estacao AS id,
118+
estacao,
119+
"websirene" AS sistema,
120+
ST_GEOGPOINT(CAST(longitude AS FLOAT64),
121+
CAST(latitude AS FLOAT64)) AS geom
122+
FROM `rj-cor.clima_pluviometro.estacoes_websirene`)
123+
UNION ALL
124+
(SELECT
125+
id_estacao AS id,
126+
estacao,
127+
"cemaden" AS sistema,
128+
ST_GEOGPOINT(CAST(longitude AS FLOAT64),
129+
CAST(latitude AS FLOAT64)) AS geom
130+
FROM `rj-cor.clima_pluviometro.estacoes_cemaden`)
72131
),
73132
74133
estacoes_mais_proximas AS ( -- calcula distância das estações para cada centróide do h3
@@ -109,26 +168,26 @@ class constants(Enum): # pylint: disable=c0103
109168
),
110169
111170
h3_media AS ( -- calcula média de chuva para as 3 estações mais próximas
112-
SELECT
113-
id_h3,
114-
CAST(sum(p1_15min)/sum(inv_dist) AS DECIMAL) AS chuva_15min,
115-
STRING_AGG(estacao ORDER BY estacao) estacoes
116-
FROM h3_chuvas
117-
-- WHERE ranking < 4
118-
GROUP BY id_h3
171+
SELECT
172+
id_h3,
173+
CAST(sum(p1_15min)/sum(inv_dist) AS DECIMAL) AS chuva_15min,
174+
STRING_AGG(estacao ORDER BY estacao) estacoes
175+
FROM h3_chuvas
176+
-- WHERE ranking < 4
177+
GROUP BY id_h3
119178
),
120179
121180
final_table AS (
122-
SELECT
123-
h3_media.id_h3,
124-
h3_media.estacoes,
125-
nome AS bairro,
126-
cast(round(h3_media.chuva_15min,2) AS decimal) AS chuva_15min,
127-
FROM h3_media
128-
LEFT JOIN `rj-cor.dados_mestres.h3_grid_res8` h3_grid
129-
ON h3_grid.id=h3_media.id_h3
130-
LEFT JOIN `rj-cor.dados_mestres.bairro`
131-
ON ST_CONTAINS(`rj-cor.dados_mestres.bairro`.geometry, ST_CENTROID(h3_grid.geometry))
181+
SELECT
182+
h3_media.id_h3,
183+
h3_media.estacoes,
184+
nome AS bairro,
185+
cast(round(h3_media.chuva_15min,2) AS decimal) AS chuva_15min,
186+
FROM h3_media
187+
LEFT JOIN `rj-cor.dados_mestres.h3_grid_res8` h3_grid
188+
ON h3_grid.id=h3_media.id_h3
189+
LEFT JOIN `rj-cor.dados_mestres.bairro`
190+
ON ST_CONTAINS(`rj-cor.dados_mestres.bairro`.geometry, ST_CENTROID(h3_grid.geometry))
132191
)
133192
134193
SELECT
@@ -144,22 +203,41 @@ class constants(Enum): # pylint: disable=c0103
144203
ELSE 'sem chuva'
145204
END AS status,
146205
CASE
147-
WHEN chuva_15min> 0 AND chuva_15min<= 10 THEN '#DAECFB'--'#00CCFF'
148-
WHEN chuva_15min> 1 AND chuva_15min<= 50 THEN '#A9CBE8'--'#BFA230'
149-
WHEN chuva_15min> 50 AND chuva_15min<= 100 THEN '#77A9D5'--'#E0701F'
150-
WHEN chuva_15min> 100 THEN '#125999'--'#FF0000'
206+
WHEN chuva_15min> 0 AND chuva_15min<= 10 THEN '#DAECFB'
207+
WHEN chuva_15min> 1 AND chuva_15min<= 50 THEN '#A9CBE8'
208+
WHEN chuva_15min> 50 AND chuva_15min<= 100 THEN '#77A9D5'
209+
WHEN chuva_15min> 100 THEN '#125999'
151210
ELSE '#ffffff'
152211
END AS color
153212
FROM final_table
154213
""",
155214
"query_update": """
215+
WITH datas AS (
216+
(SELECT
217+
MAX(
218+
DATETIME(
219+
CONCAT(data_particao," ", horario)
220+
)
221+
) AS last_update
222+
FROM `rj-cor.clima_pluviometro.taxa_precipitacao_alertario`
223+
WHERE data_particao> DATE_SUB(CURRENT_DATE('America/Sao_Paulo'), INTERVAL 2 DAY))
224+
UNION ALL
225+
(SELECT
226+
MAX(
227+
DATETIME(
228+
CONCAT(data_particao," ", horario)
229+
)
230+
) AS last_update
231+
FROM `rj-cor.clima_pluviometro.taxa_precipitacao_websirene`
232+
WHERE data_particao> DATE_SUB(CURRENT_DATE('America/Sao_Paulo'), INTERVAL 2 DAY))
233+
UNION ALL
234+
(SELECT
235+
MAX(DATETIME(data_medicao)) AS last_update
236+
FROM `rj-cor.clima_pluviometro.taxa_precipitacao_cemaden`
237+
WHERE data_particao> DATE_SUB(CURRENT_DATE('America/Sao_Paulo'), INTERVAL 2 DAY))
238+
)
156239
SELECT
157-
MAX(
158-
DATETIME(
159-
CONCAT(data_particao," ", horario)
160-
)
161-
) AS last_update
162-
FROM `rj-cor.clima_pluviometro.taxa_precipitacao_alertario`
163-
WHERE data_particao> DATE_SUB(CURRENT_DATE('America/Sao_Paulo'), INTERVAL 2 DAY)
240+
MAX(last_update) AS last_update
241+
FROM datas
164242
""",
165243
}

pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -127,41 +127,39 @@ def tratar_dados(
127127

128128
# dados.at[index, "data_medicao"] = date.strftime("%Y-%m-%d %H:%M:%S")
129129

130-
see_cols = ["id_estacao", "data_medicao", "last_update"]
131-
log(
132-
f"Dataframe after comparing with last data saved on redis {dados[see_cols].head()}"
133-
)
134-
dados["data_medicao"] = dados["data_medicao"].dt.strftime("%Y-%m-%d %H:%M:%S")
135-
log(f"Dataframe after converting to string {dados[see_cols].head()}")
136-
137-
if dados.shape[0] > 0:
138-
log(f"Dataframe after comparing with last data saved on redis {dados.iloc[0]}")
139-
140130
empty_data = dados.shape[0] == 0
141131

142-
# Save max date on redis to compare this with last dbt run
143132
if not empty_data:
133+
see_cols = ["id_estacao", "data_medicao", "last_update"]
134+
log(
135+
f"Dataframe after comparing with last data saved on redis {dados[see_cols].head()}"
136+
)
137+
log(f"Dataframe first row after comparing {dados.iloc[0]}")
138+
dados["data_medicao"] = dados["data_medicao"].dt.strftime("%Y-%m-%d %H:%M:%S")
139+
log(f"Dataframe after converting to string {dados[see_cols].head()}")
140+
141+
# Save max date on redis to compare this with last dbt run
144142
max_date = str(dados["data_medicao"].max())
145143
redis_key = build_redis_key(dataset_id, table_id, name="last_update", mode=mode)
146144
log(f"Dataframe is not empty. Redis key: {redis_key} and new date: {max_date}")
147145
save_str_on_redis(redis_key, "date", max_date)
146+
147+
# Fix columns order
148+
dados = dados[
149+
[
150+
"data_medicao",
151+
"id_estacao",
152+
"acumulado_chuva_15_min",
153+
"acumulado_chuva_1_h",
154+
"acumulado_chuva_4_h",
155+
"acumulado_chuva_24_h",
156+
"acumulado_chuva_96_h",
157+
]
158+
]
148159
else:
149160
# If df is empty stop flow on flows.py
150161
log("Dataframe is empty. Skipping update flow.")
151162

152-
# Fixar ordem das colunas
153-
dados = dados[
154-
[
155-
"data_medicao",
156-
"id_estacao",
157-
"acumulado_chuva_15_min",
158-
"acumulado_chuva_1_h",
159-
"acumulado_chuva_4_h",
160-
"acumulado_chuva_24_h",
161-
"acumulado_chuva_96_h",
162-
]
163-
]
164-
165163
return dados, empty_data
166164

167165

@@ -175,10 +173,13 @@ def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]:
175173
prepath.mkdir(parents=True, exist_ok=True)
176174

177175
partition_column = "data_medicao"
176+
log(f"Dataframe before partitions {dados.iloc[0]}")
177+
log(f"Dataframe before partitions {dados.dtypes}")
178178
dataframe, partitions = parse_date_columns(dados, partition_column)
179179
current_time = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M")
180+
log(f"Dataframe after partitions {dataframe.iloc[0]}")
181+
log(f"Dataframe after partitions {dataframe.dtypes}")
180182

181-
# Cria partições a partir da data
182183
to_partitions(
183184
data=dataframe,
184185
partition_columns=partitions,

pipelines/rj_cor/meteorologia/satelite/flows.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
create_image = Parameter("create_image", default=False, required=False)
7373

7474
# Starting tasks
75-
current_time = get_dates(current_time)
75+
current_time = get_dates(current_time, product)
7676

7777
date_hour_info = slice_data(current_time=current_time, ref_filename=ref_filename)
7878

@@ -156,7 +156,7 @@
156156
image=constants.DOCKER_IMAGE.value,
157157
labels=[constants.RJ_COR_AGENT_LABEL.value],
158158
)
159-
cor_meteorologia_goes16.schedule = rrqpe
159+
cor_meteorologia_goes16_rrqpe.schedule = rrqpe
160160

161161
cor_meteorologia_goes16_tpw = deepcopy(cor_meteorologia_goes16)
162162
cor_meteorologia_goes16_tpw.name = "COR: Meteorologia - Satelite GOES 16 - TPW"

pipelines/rj_cor/meteorologia/satelite/remap.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import netCDF4 as nc
99
import numpy as np
1010
from osgeo import osr, gdal # pylint: disable=E0401
11+
from pipelines.utils.utils import log
1112

1213

1314
def extract_resolution(input_string: str):
@@ -66,6 +67,7 @@ def remap(
6667
Converte coordenada X, Y para latlon
6768
"""
6869
# Open the file
70+
log(f"Remaping NETCDF:{path}:{variable}")
6971
img = gdal.Open(f"NETCDF:{path}:" + variable)
7072

7173
# Read the header metadata

0 commit comments

Comments
 (0)