Skip to content

Commit 0dd55aa

Browse files
authored
Merge branch 'master' into pre-commit-ci-update-config
2 parents cb13f77 + 5f6589a commit 0dd55aa

File tree

2 files changed

+78
-62
lines changed

2 files changed

+78
-62
lines changed

pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py

Lines changed: 68 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
# BILHETAGEM INTEGRAÇÃO - CAPTURA A CADA MINUTO #
6767

6868
bilhetagem_integracao_captura = deepcopy(default_capture_flow)
69-
bilhetagem_integracao_captura.name = "SMTR: Bilhetagem Integração - Captura"
69+
bilhetagem_integracao_captura.name = "SMTR: Bilhetagem Integração - Captura (subflow)"
7070
bilhetagem_integracao_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
7171
bilhetagem_integracao_captura.run_config = KubernetesRun(
7272
image=emd_constants.DOCKER_IMAGE.value,
@@ -79,7 +79,6 @@
7979
| constants.BILHETAGEM_INTEGRACAO_CAPTURE_PARAMS.value,
8080
)
8181

82-
bilhetagem_integracao_captura.schedule = every_minute
8382

8483
# BILHETAGEM GPS - CAPTURA A CADA 5 MINUTOS #
8584

@@ -295,23 +294,6 @@
295294
raise_final_state=True,
296295
)
297296

298-
# Recaptura Integração
299-
300-
run_recaptura_integracao = create_flow_run(
301-
flow_name=bilhetagem_recaptura.name,
302-
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
303-
labels=LABELS,
304-
parameters=constants.BILHETAGEM_INTEGRACAO_CAPTURE_PARAMS.value,
305-
upstream_tasks=[wait_recaptura_transacao_true],
306-
)
307-
308-
wait_recaptura_integracao_true = wait_for_flow_run(
309-
run_recaptura_integracao,
310-
stream_states=True,
311-
stream_logs=True,
312-
raise_final_state=True,
313-
)
314-
315297
# Captura Auxiliar
316298

317299
runs_captura = create_flow_run.map(
@@ -321,7 +303,7 @@
321303
labels=unmapped(LABELS),
322304
)
323305

324-
runs_captura.set_upstream(wait_recaptura_integracao_true)
306+
runs_captura.set_upstream(wait_recaptura_transacao_true)
325307

326308
wait_captura_true = wait_for_flow_run.map(
327309
runs_captura,
@@ -353,9 +335,8 @@
353335
wait_captura_false,
354336
wait_recaptura_auxiliar_false,
355337
wait_recaptura_transacao_false,
356-
wait_recaptura_integracao_false,
357338
) = task(
358-
lambda: [None, None, None, None], name="assign_none_to_capture_runs", nout=4
339+
lambda: [None, None, None], name="assign_none_to_capture_runs", nout=3
359340
)()
360341

361342
wait_captura = merge(wait_captura_false, wait_captura_true)
@@ -365,9 +346,6 @@
365346
wait_recaptura_transacao = merge(
366347
wait_recaptura_transacao_false, wait_recaptura_transacao_true
367348
)
368-
wait_recaptura_integracao = merge(
369-
wait_recaptura_integracao_false, wait_recaptura_integracao_true
370-
)
371349

372350
with case(materialize, True):
373351
materialize_timestamp = get_current_timestamp(
@@ -382,7 +360,6 @@
382360
wait_captura,
383361
wait_recaptura_auxiliar,
384362
wait_recaptura_transacao,
385-
wait_recaptura_integracao,
386363
],
387364
parameters={
388365
"timestamp": materialize_timestamp,
@@ -396,31 +373,12 @@
396373
raise_final_state=True,
397374
)
398375

399-
run_materializacao_integracao = create_flow_run(
400-
flow_name=bilhetagem_materializacao_integracao.name,
401-
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
402-
labels=LABELS,
403-
upstream_tasks=[
404-
wait_materializacao_transacao,
405-
],
406-
parameters={
407-
"timestamp": materialize_timestamp,
408-
},
409-
)
410-
411-
wait_materializacao_integracao = wait_for_flow_run(
412-
run_materializacao_integracao,
413-
stream_states=True,
414-
stream_logs=True,
415-
raise_final_state=True,
416-
)
417-
418376
run_materializacao_gps_validador = create_flow_run(
419377
flow_name=bilhetagem_materializacao_gps_validador.name,
420378
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
421379
labels=LABELS,
422380
upstream_tasks=[
423-
wait_materializacao_integracao,
381+
wait_materializacao_transacao,
424382
],
425383
parameters={
426384
"timestamp": materialize_timestamp,
@@ -478,6 +436,20 @@
478436
raise_final_state=unmapped(True),
479437
)
480438

439+
runs_captura_integracao = create_flow_run(
440+
flow_name=unmapped(bilhetagem_integracao_captura.name),
441+
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
442+
labels=unmapped(LABELS),
443+
upstream_tasks=[wait_captura],
444+
)
445+
446+
wait_captura_integracao = wait_for_flow_run(
447+
runs_captura_integracao,
448+
stream_states=unmapped(True),
449+
stream_logs=unmapped(True),
450+
raise_final_state=unmapped(True),
451+
)
452+
481453
# Recaptura #
482454

483455
runs_recaptura = create_flow_run.map(
@@ -496,23 +468,48 @@
496468
raise_final_state=unmapped(True),
497469
)
498470

471+
# Recaptura Integração
472+
473+
run_recaptura_integracao = create_flow_run(
474+
flow_name=bilhetagem_recaptura.name,
475+
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
476+
labels=LABELS,
477+
parameters=constants.BILHETAGEM_INTEGRACAO_CAPTURE_PARAMS.value,
478+
upstream_tasks=[wait_recaptura_true, wait_captura_integracao],
479+
)
480+
481+
wait_recaptura_integracao_true = wait_for_flow_run(
482+
run_recaptura_integracao,
483+
stream_states=True,
484+
stream_logs=True,
485+
raise_final_state=True,
486+
)
487+
499488
with case(capture, False):
500-
wait_recaptura_false = task(lambda: None, name="assign_none_to_recapture")()
489+
wait_recaptura_false, wait_recaptura_integracao_false = task(
490+
lambda: [None, None], name="assign_none_to_recapture", nout=2
491+
)()
501492

502493
wait_recaptura = merge(wait_recaptura_true, wait_recaptura_false)
494+
wait_recaptura_integracao = merge(
495+
wait_recaptura_integracao_true, wait_recaptura_integracao_false
496+
)
503497

504498
# Materialização #
505499

506500
with case(materialize, True):
501+
materialize_timestamp = get_current_timestamp(
502+
timestamp=timestamp,
503+
return_str=True,
504+
)
505+
507506
run_materializacao = create_flow_run(
508507
flow_name=bilhetagem_materializacao_ordem_pagamento.name,
509508
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
510509
labels=LABELS,
511-
upstream_tasks=[wait_recaptura],
510+
upstream_tasks=[wait_recaptura, wait_recaptura_integracao],
512511
parameters={
513-
"timestamp": get_current_timestamp(
514-
timestamp=timestamp, return_str=True
515-
),
512+
"timestamp": materialize_timestamp,
516513
},
517514
)
518515

@@ -523,8 +520,27 @@
523520
raise_final_state=True,
524521
)
525522

523+
run_materializacao_integracao = create_flow_run(
524+
flow_name=bilhetagem_materializacao_integracao.name,
525+
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
526+
labels=LABELS,
527+
upstream_tasks=[
528+
wait_materializacao,
529+
],
530+
parameters={
531+
"timestamp": materialize_timestamp,
532+
},
533+
)
534+
535+
wait_materializacao_integracao = wait_for_flow_run(
536+
run_materializacao_integracao,
537+
stream_states=True,
538+
stream_logs=True,
539+
raise_final_state=True,
540+
)
541+
526542
bilhetagem_ordem_pagamento_captura_tratamento.set_reference_tasks(
527-
[wait_materializacao, wait_recaptura]
543+
[wait_materializacao_integracao, wait_recaptura]
528544
)
529545

530546
bilhetagem_ordem_pagamento_captura_tratamento.storage = GCS(

pipelines/rj_smtr/constants.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ class constants(Enum): # pylint: disable=c0103
227227
""",
228228
},
229229
"primary_key": ["id"],
230-
"interval_minutes": 1,
230+
"interval_minutes": 1440,
231231
}
232232

233233
BILHETAGEM_TRACKING_CAPTURE_PARAMS = {
@@ -455,32 +455,32 @@ class constants(Enum): # pylint: disable=c0103
455455
},
456456
]
457457

458-
BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS = {
458+
BILHETAGEM_MATERIALIZACAO_INTEGRACAO_PARAMS = {
459459
"dataset_id": BILHETAGEM_DATASET_ID,
460-
"table_id": BILHETAGEM_TRANSACAO_CAPTURE_PARAMS["table_id"],
460+
"table_id": "integracao",
461461
"upstream": True,
462462
"dbt_vars": {
463463
"date_range": {
464-
"table_run_datetime_column_name": "datetime_transacao",
465-
"delay_hours": 1,
464+
"table_run_datetime_column_name": "datetime_captura",
465+
"delay_hours": 0,
466466
},
467467
"version": {},
468468
},
469+
"exclude": "+operadoras +consorcios",
469470
}
470471

471-
BILHETAGEM_MATERIALIZACAO_INTEGRACAO_PARAMS = {
472+
BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS = {
472473
"dataset_id": BILHETAGEM_DATASET_ID,
473-
"table_id": BILHETAGEM_INTEGRACAO_CAPTURE_PARAMS["table_id"],
474+
"table_id": "passageiros_hora",
474475
"upstream": True,
475476
"dbt_vars": {
476477
"date_range": {
477-
"table_run_datetime_column_name": "datetime_captura",
478+
"table_run_datetime_column_name": "datetime_transacao",
478479
"delay_hours": 1,
479-
"table_alias": "integracao",
480480
},
481481
"version": {},
482482
},
483-
"exclude": "+diretorio_operadoras +diretorio_consorcios",
483+
"exclude": "integracao matriz_integracao",
484484
}
485485

486486
BILHETAGEM_MATERIALIZACAO_ORDEM_PAGAMENTO_PARAMS = {

0 commit comments

Comments
 (0)