Skip to content

Commit 2f5874d

Browse files
authored
Merge branch 'master' into pre-commit-ci-update-config
2 parents 721312b + fc2a667 commit 2f5874d

File tree

6 files changed

+541
-1
lines changed

6 files changed

+541
-1
lines changed

pipelines/rj_cor/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from pipelines.rj_cor.meteorologia.meteorologia_redemet.flows import *
88
from pipelines.rj_cor.meteorologia.precipitacao_alertario.flows import *
99
from pipelines.rj_cor.meteorologia.precipitacao_cemaden.flows import *
10+
from pipelines.rj_cor.meteorologia.precipitacao_inea.flows import *
1011
from pipelines.rj_cor.meteorologia.satelite.flows import *
1112
from pipelines.rj_cor.meteorologia.precipitacao_websirene.flows import *
1213
from pipelines.rj_cor.meteorologia.radar.precipitacao.flows import *
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
# -*- coding: utf-8 -*-
2+
# pylint: disable=C0103
3+
"""
4+
Flows for precipitacao_inea.
5+
"""
6+
from datetime import timedelta
7+
8+
from prefect import case, Parameter
9+
from prefect.run_configs import KubernetesRun
10+
from prefect.storage import GCS
11+
from prefect.tasks.prefect import create_flow_run
12+
13+
from pipelines.constants import constants
14+
from pipelines.utils.constants import constants as utils_constants
15+
from pipelines.utils.custom import wait_for_flow_run_with_timeout
16+
from pipelines.rj_cor.meteorologia.precipitacao_inea.tasks import (
17+
check_for_new_stations,
18+
check_new_data,
19+
download_data,
20+
treat_data,
21+
save_data,
22+
wait_task,
23+
)
24+
from pipelines.rj_cor.meteorologia.precipitacao_inea.schedules import (
25+
minute_schedule,
26+
)
27+
from pipelines.utils.decorators import Flow
28+
from pipelines.utils.dump_db.constants import constants as dump_db_constants
29+
from pipelines.utils.dump_to_gcs.constants import constants as dump_to_gcs_constants
30+
from pipelines.utils.tasks import (
31+
create_table_and_upload_to_gcs,
32+
get_current_flow_labels,
33+
)
34+
35+
wait_for_flow_run_with_2min_timeout = wait_for_flow_run_with_timeout(
36+
timeout=timedelta(minutes=2)
37+
)
38+
39+
with Flow(
40+
name="COR: Meteorologia - Precipitacao e Fluviometria INEA",
41+
code_owners=[
42+
"paty",
43+
],
44+
# skip_if_running=True,
45+
) as cor_meteorologia_precipitacao_inea:
46+
DUMP_MODE = Parameter("dump_mode", default="append", required=True)
47+
DATASET_ID_PLUVIOMETRIC = Parameter(
48+
"dataset_id_pluviometric", default="clima_pluviometro", required=True
49+
)
50+
TABLE_ID_PLUVIOMETRIC = Parameter(
51+
"table_id_pluviometric", default="taxa_precipitacao_inea", required=True
52+
)
53+
DATASET_ID_FLUVIOMETRIC = Parameter(
54+
"dataset_id_fluviometric", default="clima_fluviometro", required=True
55+
)
56+
TABLE_ID_FLUVIOMETRIC = Parameter(
57+
"table_id_fluviometric", default="lamina_agua_inea", required=True
58+
)
59+
60+
# Materialization parameters
61+
MATERIALIZE_AFTER_DUMP = Parameter(
62+
"materialize_after_dump", default=True, required=False
63+
)
64+
MATERIALIZE_TO_DATARIO = Parameter(
65+
"materialize_to_datario", default=True, required=False
66+
)
67+
MATERIALIZATION_MODE = Parameter("mode", default="prod", required=False)
68+
69+
# Dump to GCS after? Should only dump to GCS if materializing to datario
70+
DUMP_TO_GCS = Parameter("dump_to_gcs", default=False, required=False)
71+
72+
MAXIMUM_BYTES_PROCESSED = Parameter(
73+
"maximum_bytes_processed",
74+
required=False,
75+
default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value,
76+
)
77+
78+
dataframe = download_data()
79+
dfr_pluviometric, dfr_fluviometric = treat_data(
80+
dataframe=dataframe,
81+
dataset_id=DATASET_ID_PLUVIOMETRIC,
82+
table_id=TABLE_ID_PLUVIOMETRIC,
83+
mode=MATERIALIZATION_MODE,
84+
)
85+
new_pluviometric_data, new_fluviometric_data = check_new_data(
86+
dfr_pluviometric, dfr_fluviometric
87+
)
88+
89+
with case(new_pluviometric_data, True):
90+
path_pluviometric = save_data(
91+
dataframe=dfr_pluviometric, folder_name="pluviometer"
92+
)
93+
94+
# Create pluviometric table in BigQuery
95+
UPLOAD_TABLE_PLUVIOMETRIC = create_table_and_upload_to_gcs(
96+
data_path=path_pluviometric,
97+
dataset_id=DATASET_ID_PLUVIOMETRIC,
98+
table_id=TABLE_ID_PLUVIOMETRIC,
99+
dump_mode=DUMP_MODE,
100+
wait=path_pluviometric,
101+
)
102+
103+
# Trigger pluviometric DBT flow run
104+
with case(MATERIALIZE_AFTER_DUMP, True):
105+
current_flow_labels = get_current_flow_labels()
106+
materialization_flow = create_flow_run(
107+
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
108+
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
109+
parameters={
110+
"dataset_id": DATASET_ID_PLUVIOMETRIC,
111+
"table_id": TABLE_ID_PLUVIOMETRIC,
112+
"mode": MATERIALIZATION_MODE,
113+
"materialize_to_datario": MATERIALIZE_TO_DATARIO,
114+
},
115+
labels=current_flow_labels,
116+
run_name=f"Materialize {DATASET_ID_PLUVIOMETRIC}.{TABLE_ID_PLUVIOMETRIC}",
117+
)
118+
119+
materialization_flow.set_upstream(current_flow_labels)
120+
121+
wait_for_materialization = wait_for_flow_run_with_2min_timeout(
122+
flow_run_id=materialization_flow,
123+
stream_states=True,
124+
stream_logs=True,
125+
raise_final_state=True,
126+
)
127+
wait_for_materialization.max_retries = (
128+
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
129+
)
130+
wait_for_materialization.retry_delay = timedelta(
131+
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
132+
)
133+
134+
with case(DUMP_TO_GCS, True):
135+
# Trigger Dump to GCS flow run with project id as datario
136+
dump_to_gcs_flow = create_flow_run(
137+
flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value,
138+
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
139+
parameters={
140+
"project_id": "datario",
141+
"dataset_id": DATASET_ID_PLUVIOMETRIC,
142+
"table_id": TABLE_ID_PLUVIOMETRIC,
143+
"maximum_bytes_processed": MAXIMUM_BYTES_PROCESSED,
144+
},
145+
labels=[
146+
"datario",
147+
],
148+
run_name=f"Dump to GCS {DATASET_ID_PLUVIOMETRIC}.{TABLE_ID_PLUVIOMETRIC}",
149+
)
150+
dump_to_gcs_flow.set_upstream(wait_for_materialization)
151+
152+
wait_for_dump_to_gcs = wait_for_flow_run_with_2min_timeout(
153+
flow_run_id=dump_to_gcs_flow,
154+
stream_states=True,
155+
stream_logs=True,
156+
raise_final_state=True,
157+
)
158+
159+
status = wait_task()
160+
status.set_upstream(UPLOAD_TABLE_PLUVIOMETRIC)
161+
with case(new_fluviometric_data, True):
162+
path_fluviometric = save_data(
163+
dataframe=dfr_fluviometric, folder_name="fluviometer"
164+
)
165+
path_fluviometric.set_upstream(status)
166+
167+
# Create fluviometric table in BigQuery
168+
UPLOAD_TABLE_FLUVIOMETRIC = create_table_and_upload_to_gcs(
169+
data_path=path_fluviometric,
170+
dataset_id=DATASET_ID_FLUVIOMETRIC,
171+
table_id=TABLE_ID_FLUVIOMETRIC,
172+
dump_mode=DUMP_MODE,
173+
wait=path_fluviometric,
174+
)
175+
176+
# Trigger DBT flow run
177+
with case(MATERIALIZE_AFTER_DUMP, True):
178+
current_flow_labels = get_current_flow_labels()
179+
materialization_flow = create_flow_run(
180+
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
181+
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
182+
parameters={
183+
"dataset_id": DATASET_ID_FLUVIOMETRIC,
184+
"table_id": TABLE_ID_FLUVIOMETRIC,
185+
"mode": MATERIALIZATION_MODE,
186+
"materialize_to_datario": MATERIALIZE_TO_DATARIO,
187+
},
188+
labels=current_flow_labels,
189+
run_name=f"Materialize {DATASET_ID_FLUVIOMETRIC}.{TABLE_ID_FLUVIOMETRIC}",
190+
)
191+
192+
materialization_flow.set_upstream(current_flow_labels)
193+
194+
wait_for_materialization = wait_for_flow_run_with_2min_timeout(
195+
flow_run_id=materialization_flow,
196+
stream_states=True,
197+
stream_logs=True,
198+
raise_final_state=True,
199+
)
200+
wait_for_materialization.max_retries = (
201+
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
202+
)
203+
wait_for_materialization.retry_delay = timedelta(
204+
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
205+
)
206+
207+
with case(DUMP_TO_GCS, True):
208+
# Trigger Dump to GCS flow run with project id as datario
209+
dump_to_gcs_flow = create_flow_run(
210+
flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value,
211+
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
212+
parameters={
213+
"project_id": "datario",
214+
"dataset_id": DATASET_ID_FLUVIOMETRIC,
215+
"table_id": TABLE_ID_FLUVIOMETRIC,
216+
"maximum_bytes_processed": MAXIMUM_BYTES_PROCESSED,
217+
},
218+
labels=[
219+
"datario",
220+
],
221+
run_name=f"Dump to GCS {DATASET_ID_FLUVIOMETRIC}.{TABLE_ID_FLUVIOMETRIC}",
222+
)
223+
dump_to_gcs_flow.set_upstream(wait_for_materialization)
224+
225+
wait_for_dump_to_gcs = wait_for_flow_run_with_2min_timeout(
226+
flow_run_id=dump_to_gcs_flow,
227+
stream_states=True,
228+
stream_logs=True,
229+
raise_final_state=True,
230+
)
231+
232+
check_for_new_stations(dataframe, wait=UPLOAD_TABLE_PLUVIOMETRIC)
233+
234+
# para rodar na cloud
235+
cor_meteorologia_precipitacao_inea.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
236+
cor_meteorologia_precipitacao_inea.run_config = KubernetesRun(
237+
image=constants.DOCKER_IMAGE.value,
238+
labels=[constants.RJ_COR_AGENT_LABEL.value],
239+
)
240+
cor_meteorologia_precipitacao_inea.schedule = minute_schedule
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# -*- coding: utf-8 -*-
2+
# pylint: disable=C0103
3+
"""
4+
Schedules for precipitacao_inea
5+
Rodar a cada 1 minuto
6+
"""
7+
from datetime import timedelta, datetime
8+
from prefect.schedules import Schedule
9+
from prefect.schedules.clocks import IntervalClock
10+
from pipelines.constants import constants
11+
12+
minute_schedule = Schedule(
13+
clocks=[
14+
IntervalClock(
15+
interval=timedelta(minutes=5),
16+
start_date=datetime(2023, 1, 1, 0, 1, 0),
17+
labels=[
18+
constants.RJ_COR_AGENT_LABEL.value,
19+
],
20+
parameter_defaults={
21+
# "trigger_rain_dashboard_update": True,
22+
"materialize_after_dump": True,
23+
"mode": "prod",
24+
"materialize_to_datario": True,
25+
"dump_to_gcs": False,
26+
"dump_mode": "append",
27+
"dataset_id_pluviometric": "clima_pluviometro",
28+
"table_id_pluviometric": "taxa_precipitacao_inea",
29+
"dataset_id_fluviometric": "clima_fluviometro",
30+
"table_id_fluviometric": "lamina_agua_inea",
31+
},
32+
),
33+
]
34+
)

0 commit comments

Comments
 (0)