diff --git a/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.html b/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.html index 8722f6890..08c218b59 100644 --- a/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.html +++ b/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.html @@ -251,6 +251,29 @@
pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
default_parameters=bilhetagem_materializacao_integracao_parameters,
)
+
+# GPS Validador
+
+bilhetagem_materializacao_gps_validador = deepcopy(default_materialization_flow)
+bilhetagem_materializacao_gps_validador.name = (
+ "SMTR: Bilhetagem GPS Validador - Materialização (subflow)"
+)
+bilhetagem_materializacao_gps_validador.storage = GCS(
+ emd_constants.GCS_FLOWS_BUCKET.value
+)
+bilhetagem_materializacao_gps_validador.run_config = KubernetesRun(
+ image=emd_constants.DOCKER_IMAGE.value,
+ labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
+)
+
+bilhetagem_materializacao_gps_validador = set_default_parameters(
+ flow=bilhetagem_materializacao_gps_validador,
+ default_parameters=constants.BILHETAGEM_MATERIALIZACAO_GPS_VALIDADOR_PARAMS.value,
+)
+
+bilhetagem_materializacao_gps_validador.state_handlers.append(skip_if_running_handler)
+
+
# RECAPTURA #
bilhetagem_recaptura = deepcopy(default_capture_flow)
@@ -421,6 +444,25 @@ Module pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
raise_final_state=True,
)
+ run_materializacao_gps_validador = create_flow_run(
+ flow_name=bilhetagem_materializacao_gps_validador.name,
+ project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
+ labels=LABELS,
+ upstream_tasks=[
+ wait_materializacao_integracao,
+ ],
+ parameters={
+ "timestamp": materialize_timestamp,
+ },
+ )
+
+ wait_materializacao_gps_validador = wait_for_flow_run(
+ run_materializacao_gps_validador,
+ stream_states=True,
+ stream_logs=True,
+ raise_final_state=True,
+ )
+
bilhetagem_transacao_tratamento.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_transacao_tratamento.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
@@ -428,46 +470,6 @@ Module pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
)
bilhetagem_transacao_tratamento.schedule = every_hour
-
-# with Flow(
-# "SMTR: Bilhetagem GPS Validador - Tratamento",
-# code_owners=["caio", "fernanda", "boris", "rodrigo", "rafaelpinheiro"],
-# ) as bilhetagem_gps_tratamento:
-# timestamp = get_rounded_timestamp(
-# interval_minutes=constants.BILHETAGEM_TRATAMENTO_INTERVAL.value
-# )
-
-# rename_flow_run = rename_current_flow_run_now_time(
-# prefix=bilhetagem_transacao_tratamento.name + " ",
-# now_time=timestamp,
-# )
-
-# LABELS = get_current_flow_labels()
-
-# # Recaptura GPS
-
-# run_recaptura_gps = create_flow_run(
-# flow_name=bilhetagem_recaptura.name,
-# project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
-# labels=LABELS,
-# parameters=constants.BILHETAGEM_TRACKING_CAPTURE_PARAMS.value,
-# )
-
-# wait_recaptura_gps = wait_for_flow_run(
-# run_recaptura_gps,
-# stream_states=True,
-# stream_logs=True,
-# raise_final_state=True,
-# )
-
-
-# bilhetagem_gps_tratamento.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
-# bilhetagem_gps_tratamento.run_config = KubernetesRun(
-# image=emd_constants.DOCKER_IMAGE.value,
-# labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
-# )
-# # bilhetagem_gps_tratamento.schedule = every_hour
-
# CAPTURA/TRATAMENTO - ORDEM PAGAMENTO:
# CAPTURA + RECAPTURA + MATERIALIZAÇÃO
diff --git a/rj_smtr/constants.html b/rj_smtr/constants.html
index e61b29d41..6a7705a13 100644
--- a/rj_smtr/constants.html
+++ b/rj_smtr/constants.html
@@ -482,26 +482,6 @@ Module pipelines.rj_smtr.constants
"interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
"save_bucket_name": BILHETAGEM_PRIVATE_BUCKET,
},
- {
- "table_id": "tipo_modal",
- "partition_date_only": True,
- "extract_params": {
- "database": "principal_db",
- "query": """
- SELECT
- *
- FROM
- TIPO_MODAL
- WHERE
- {update}
- """,
- "get_updates": ["cd_tipo_modal", "ds_tipo_modal"],
- },
- "primary_key": [
- "CD_TIPO_MODAL",
- ], # id column to nest data on
- "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
- },
]
BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS = {
@@ -546,6 +526,20 @@ Module pipelines.rj_smtr.constants
},
}
+ BILHETAGEM_MATERIALIZACAO_GPS_VALIDADOR_PARAMS = {
+ "dataset_id": BILHETAGEM_DATASET_ID,
+ "table_id": "gps_validador",
+ "upstream": True,
+ "exclude": "+operadoras +consorcios",
+ "dbt_vars": {
+ "date_range": {
+ "table_run_datetime_column_name": "datetime_captura",
+ "delay_hours": 0,
+ },
+ "version": {},
+ },
+ }
+
BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS = {
"dataset_id": BILHETAGEM_DATASET_ID,
"secret_path": BILHETAGEM_SECRET_PATH,
@@ -1280,26 +1274,6 @@ Classes
"interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
"save_bucket_name": BILHETAGEM_PRIVATE_BUCKET,
},
- {
- "table_id": "tipo_modal",
- "partition_date_only": True,
- "extract_params": {
- "database": "principal_db",
- "query": """
- SELECT
- *
- FROM
- TIPO_MODAL
- WHERE
- {update}
- """,
- "get_updates": ["cd_tipo_modal", "ds_tipo_modal"],
- },
- "primary_key": [
- "CD_TIPO_MODAL",
- ], # id column to nest data on
- "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
- },
]
BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS = {
@@ -1344,6 +1318,20 @@ Classes
},
}
+ BILHETAGEM_MATERIALIZACAO_GPS_VALIDADOR_PARAMS = {
+ "dataset_id": BILHETAGEM_DATASET_ID,
+ "table_id": "gps_validador",
+ "upstream": True,
+ "exclude": "+operadoras +consorcios",
+ "dbt_vars": {
+ "date_range": {
+ "table_run_datetime_column_name": "datetime_captura",
+ "delay_hours": 0,
+ },
+ "version": {},
+ },
+ }
+
BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS = {
"dataset_id": BILHETAGEM_DATASET_ID,
"secret_path": BILHETAGEM_SECRET_PATH,
@@ -1637,6 +1625,10 @@ Class variables
+var BILHETAGEM_MATERIALIZACAO_GPS_VALIDADOR_PARAMS
+
+
+
var BILHETAGEM_MATERIALIZACAO_INTEGRACAO_PARAMS
@@ -2101,6 +2093,7 @@ BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS
BILHETAGEM_GENERAL_CAPTURE_PARAMS
BILHETAGEM_INTEGRACAO_CAPTURE_PARAMS
+BILHETAGEM_MATERIALIZACAO_GPS_VALIDADOR_PARAMS
BILHETAGEM_MATERIALIZACAO_INTEGRACAO_PARAMS
BILHETAGEM_MATERIALIZACAO_ORDEM_PAGAMENTO_PARAMS
BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS