Skip to content

Commit

Permalink
cria recaptura bilhetagem
Browse files Browse the repository at this point in the history
  • Loading branch information
pixuimpou committed Oct 17, 2023
1 parent 16ffff3 commit 55fbe34
Showing 1 changed file with 44 additions and 7 deletions.
51 changes: 44 additions & 7 deletions pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,14 @@

from pipelines.rj_smtr.constants import constants

from pipelines.rj_smtr.schedules import every_hour
from pipelines.rj_smtr.schedules import every_hour, every_minute


GENERAL_CAPTURE_DEFAULT_PARAMS = {
"dataset_id": constants.BILHETAGEM_DATASET_ID.value,
"secret_path": constants.BILHETAGEM_SECRET_PATH.value,
"source_type": constants.BILHETAGEM_GENERAL_CAPTURE_PARAMS.value["source_type"],
}

# Flows #

Expand All @@ -52,7 +59,25 @@
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value],
)
bilhetagem_transacao_captura.schedule = bilhetagem_transacao_schedule

bilhetagem_transacao_captura = set_default_parameters(
flow=bilhetagem_transacao_captura,
default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS
| constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value,
)

bilhetagem_transacao_captura.schedule = every_minute

bilhetagem_transacao_recaptura = deepcopy(default_capture_flow)
bilhetagem_transacao_recaptura.name = "SMTR: Bilhetagem Transação - Recaptura (subflow)"
bilhetagem_transacao_recaptura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_transacao_recaptura = set_default_parameters(
flow=bilhetagem_transacao_recaptura,
default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS
| constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value
| {"recapture": True},
)


# BILHETAGEM AUXILIAR - SUBFLOW PARA RODAR ANTES DE CADA MATERIALIZAÇÃO #

Expand All @@ -66,11 +91,7 @@

bilhetagem_auxiliar_captura = set_default_parameters(
flow=bilhetagem_auxiliar_captura,
default_parameters={
"dataset_id": constants.BILHETAGEM_DATASET_ID.value,
"secret_path": constants.BILHETAGEM_SECRET_PATH.value,
"source_type": constants.BILHETAGEM_GENERAL_CAPTURE_PARAMS.value["source_type"],
},
default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS,
)

# MATERIALIZAÇÃO - SUBFLOW DE MATERIALIZAÇÃO
Expand Down Expand Up @@ -106,12 +127,28 @@

LABELS = get_current_flow_labels()

# Recaptura Transações

run_recaptura = create_flow_run(
flow_name=bilhetagem_transacao_recaptura.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
)

wait_recaptura = wait_for_flow_run(
run_recaptura,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

# Captura
runs_captura = create_flow_run.map(
flow_name=unmapped(bilhetagem_auxiliar_captura.name),
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value,
labels=unmapped(LABELS),
upstream_tasks=[wait_recaptura],
)

wait_captura = wait_for_flow_run.map(
Expand Down

0 comments on commit 55fbe34

Please sign in to comment.