From 4f5007ca04eab302643ee125c912783c1ff3ca0c Mon Sep 17 00:00:00 2001 From: Rafael Date: Thu, 26 Oct 2023 11:04:04 -0300 Subject: [PATCH 01/27] adicionar tabelas de ressarcimento --- .../br_rj_riodejaneiro_bilhetagem/flows.py | 21 +++++++- pipelines/rj_smtr/constants.py | 52 +++++++++++++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index 5b7a53bde..53b14d0d8 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -65,7 +65,7 @@ bilhetagem_transacao_captura.schedule = every_minute -# BILHETAGEM GPS +# BILHETAGEM GPS - CAPTURA A CADA MINUTO # bilhetagem_tracking_captura = deepcopy(default_capture_flow) bilhetagem_tracking_captura.name = "SMTR: Bilhetagem GPS Validador - Captura" @@ -83,6 +83,25 @@ bilhetagem_tracking_captura.schedule = every_minute +# BILHETAGEM RESSARCIMENTO - SUBFLOW PARA RODAR DIARIAMENTE # + +bilhetagem_ressarcimento_captura = deepcopy(default_capture_flow) +bilhetagem_ressarcimento_captura.name = ( + "SMTR: Bilhetagem Ressarcimento - Captura (subflow)" +) +bilhetagem_ressarcimento_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +bilhetagem_ressarcimento_captura.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], +) + +bilhetagem_ressarcimento_captura = set_default_parameters( + flow=bilhetagem_ressarcimento_captura, + default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS + | constants.BILHETAGEM_RESSARCIMENTO_CAPTURE_PARAMS.value, +) + + # BILHETAGEM AUXILIAR - SUBFLOW PARA RODAR ANTES DE CADA MATERIALIZAÇÃO # bilhetagem_auxiliar_captura = deepcopy(default_capture_flow) diff --git a/pipelines/rj_smtr/constants.py b/pipelines/rj_smtr/constants.py index cbb813106..fc577484a 100644 --- a/pipelines/rj_smtr/constants.py +++ b/pipelines/rj_smtr/constants.py @@ -184,6 +184,10 @@ class constants(Enum): # pylint: disable=c0103 "engine": "postgresql", "host": "10.5.15.25", }, + "ressarcimento_db": { + "engine": "postgresql", + "host": "10.5.15.127", + }, }, "source_type": "db", } @@ -226,6 +230,39 @@ class constants(Enum): # pylint: disable=c0103 "interval_minutes": 1, } + BILHETAGEM_RESSARCIMENTO_CAPTURE_PARAMS = [ + { + "table_id": "ordem_ressarcimento", + "partition_date_only": False, + "extract_params": { + "database": "ressarcimento_db", + "query": """ + SELECT + * + FROM + ordem_ressarcimento + """, + }, + "primary_key": ["id"], + "interval_minutes": 1, + }, + { + "table_id": "ordem_pagamento", + "partition_date_only": False, + "extract_params": { + "database": "ressarcimento_db", + "query": """ + SELECT + * + FROM + ordem_pagamento + """, + }, + "primary_key": ["id"], + "interval_minutes": 1, + }, + ] + BILHETAGEM_SECRET_PATH = "smtr_jae_access_data" BILHETAGEM_TRATAMENTO_INTERVAL = 60 @@ -306,6 +343,21 @@ class constants(Enum): # pylint: disable=c0103 ], # id column to nest data on "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL, }, + { + "table_id": "dw_dm_operadora", + "partition_date_only": True, + "extract_params": { + "database": "principal_db", + "query": """ + SELECT + * + FROM + dw_dm_operadora + """, + }, + "primary_key": ["id_operadora"], # id column to nest data on + "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL, + }, ] BILHETAGEM_MATERIALIZACAO_PARAMS = { From b0107fe0b6f2bbce8c3fc6b2e18bb4df5d009f76 Mon Sep 17 00:00:00 2001 From: Rafael Date: Thu, 26 Oct 2023 11:34:56 -0300 Subject: [PATCH 02/27] criar flow ressarcimento --- .../br_rj_riodejaneiro_bilhetagem/flows.py | 49 ++++++++++++++++--- 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index 53b14d0d8..5ccfe16c2 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -97,11 +97,9 @@ bilhetagem_ressarcimento_captura = set_default_parameters( flow=bilhetagem_ressarcimento_captura, - default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS - | constants.BILHETAGEM_RESSARCIMENTO_CAPTURE_PARAMS.value, + default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS, ) - # BILHETAGEM AUXILIAR - SUBFLOW PARA RODAR ANTES DE CADA MATERIALIZAÇÃO # bilhetagem_auxiliar_captura = deepcopy(default_capture_flow) @@ -117,7 +115,8 @@ default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS, ) -# MATERIALIZAÇÃO - SUBFLOW DE MATERIALIZAÇÃO +# MATERIALIZAÇÃO - SUBFLOW DE MATERIALIZAÇÃO # + bilhetagem_materializacao = deepcopy(default_materialization_flow) bilhetagem_materializacao.name = "SMTR: Bilhetagem Transação - Materialização (subflow)" bilhetagem_materializacao.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) @@ -143,7 +142,7 @@ default_parameters=bilhetagem_materializacao_parameters, ) -# RECAPTURA +# RECAPTURA # bilhetagem_recaptura = deepcopy(default_capture_flow) bilhetagem_recaptura.name = "SMTR: Bilhetagem - Recaptura (subflow)" @@ -153,7 +152,8 @@ default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS | {"recapture": True}, ) -# TRATAMENTO - RODA DE HORA EM HORA, RECAPTURAS + CAPTURA AUXILIAR + MATERIALIZAÇÃO +# TRATAMENTO - RODA DE HORA EM HORA, RECAPTURAS + CAPTURA AUXILIAR + MATERIALIZAÇÃO # + with Flow( "SMTR: Bilhetagem Transação - Tratamento", code_owners=["caio", "fernanda", "boris", "rodrigo"], @@ -311,3 +311,40 @@ labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], ) bilhetagem_gps_tratamento.schedule = every_hour + +with Flow( + "SMTR: Bilhetagem Ressarcimento - Captura/Tratamento", + code_owners=["caio", "fernanda", "boris", "rodrigo"], +) as bilhetagem_ressarcimento_captura_tratamento: + timestamp = get_rounded_timestamp( + interval_minutes=constants.BILHETAGEM_TRATAMENTO_INTERVAL.value + ) + + rename_flow_run = rename_current_flow_run_now_time( + prefix=bilhetagem_ressarcimento_captura_tratamento.name + " ", + now_time=timestamp, + ) + + LABELS = get_current_flow_labels() + + runs_captura = create_flow_run.map( + flow_name=unmapped(bilhetagem_ressarcimento_captura.name), + project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), + parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value, + labels=unmapped(LABELS), + ) + + wait_captura = wait_for_flow_run.map( + runs_captura, + stream_states=unmapped(True), + stream_logs=unmapped(True), + raise_final_state=unmapped(True), + ) + +bilhetagem_ressarcimento_captura_tratamento.storage = GCS( + emd_constants.GCS_FLOWS_BUCKET.value +) +bilhetagem_ressarcimento_captura_tratamento.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], +) From b234c005f4935c8b11d49de4fe0bac7a17caecff Mon Sep 17 00:00:00 2001 From: Rafael Date: Thu, 26 Oct 2023 12:11:33 -0300 Subject: [PATCH 03/27] alterar partition --- pipelines/rj_smtr/constants.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_smtr/constants.py b/pipelines/rj_smtr/constants.py index fc577484a..c5c111c07 100644 --- a/pipelines/rj_smtr/constants.py +++ b/pipelines/rj_smtr/constants.py @@ -233,7 +233,7 @@ class constants(Enum): # pylint: disable=c0103 BILHETAGEM_RESSARCIMENTO_CAPTURE_PARAMS = [ { "table_id": "ordem_ressarcimento", - "partition_date_only": False, + "partition_date_only": True, "extract_params": { "database": "ressarcimento_db", "query": """ @@ -248,7 +248,7 @@ class constants(Enum): # pylint: disable=c0103 }, { "table_id": "ordem_pagamento", - "partition_date_only": False, + "partition_date_only": True, "extract_params": { "database": "ressarcimento_db", "query": """ From 2bbbfa6e8ad11a89a33475efbd3adab5366c8f46 Mon Sep 17 00:00:00 2001 From: Rafael Date: Thu, 26 Oct 2023 12:41:52 -0300 Subject: [PATCH 04/27] mudar projeto para staging --- pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index 5ccfe16c2..5ccbe5abd 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -329,7 +329,8 @@ runs_captura = create_flow_run.map( flow_name=unmapped(bilhetagem_ressarcimento_captura.name), - project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), + # project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), + project_name=unmapped("staging"), parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value, labels=unmapped(LABELS), ) From 8309ceb99ecc4c466f9a27217ac7b57c1a3c8d61 Mon Sep 17 00:00:00 2001 From: Rafael Date: Thu, 26 Oct 2023 12:48:38 -0300 Subject: [PATCH 05/27] adicionar serializacao de date --- pipelines/rj_smtr/utils.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pipelines/rj_smtr/utils.py b/pipelines/rj_smtr/utils.py index 389a31d54..b0430eee5 100644 --- a/pipelines/rj_smtr/utils.py +++ b/pipelines/rj_smtr/utils.py @@ -7,7 +7,7 @@ from ftplib import FTP from pathlib import Path -from datetime import timedelta, datetime +from datetime import timedelta, datetime, date from typing import List, Union, Any import traceback import io @@ -472,12 +472,12 @@ def custom_serialization(obj: Any) -> Any: Returns: Any: Serialized object """ - if isinstance(obj, pd.Timestamp): - if obj.tzinfo is None: - obj = obj.tz_localize("UTC").tz_convert( - emd_constants.DEFAULT_TIMEZONE.value - ) - + if isinstance(obj, (pd.Timestamp, date)): + if isinstance(obj, pd.Timestamp): + if obj.tzinfo is None: + obj = obj.tz_localize("UTC").tz_convert( + emd_constants.DEFAULT_TIMEZONE.value + ) return obj.isoformat() raise TypeError(f"Object of type {type(obj)} is not JSON serializable") From 431ff174717d1cc231904474485c28faa59eac88 Mon Sep 17 00:00:00 2001 From: Rafael Date: Thu, 26 Oct 2023 13:44:34 -0300 Subject: [PATCH 06/27] corrigir parametros das tabelas --- pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index 5ccbe5abd..9c6fa7748 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -331,7 +331,7 @@ flow_name=unmapped(bilhetagem_ressarcimento_captura.name), # project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), project_name=unmapped("staging"), - parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value, + parameters=constants.BILHETAGEM_RESSARCIMENTO_CAPTURE_PARAMS.value, labels=unmapped(LABELS), ) From 994bb44364b0df9f534e1ca1db9736c32a0889c6 Mon Sep 17 00:00:00 2001 From: Rafael Date: Fri, 27 Oct 2023 08:24:26 -0300 Subject: [PATCH 07/27] adicionar tabela auxiliar consorcio --- pipelines/rj_smtr/constants.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pipelines/rj_smtr/constants.py b/pipelines/rj_smtr/constants.py index c5c111c07..20a872b62 100644 --- a/pipelines/rj_smtr/constants.py +++ b/pipelines/rj_smtr/constants.py @@ -358,6 +358,24 @@ class constants(Enum): # pylint: disable=c0103 "primary_key": ["id_operadora"], # id column to nest data on "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL, }, + { + "table_id": "consorcio", + "partition_date_only": True, + "extract_params": { + "database": "principal_db", + "query": """ + SELECT + * + FROM + CONSORCIO + WHERE + DT_INCLUSAO BETWEEN '{start}' + AND '{end}' + """, + }, + "primary_key": ["CD_CONSORCIO"], # id column to nest data on + "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL, + }, ] BILHETAGEM_MATERIALIZACAO_PARAMS = { From a361b966ac999f63b4834b414792b72c3657a51f Mon Sep 17 00:00:00 2001 From: Rafael Date: Fri, 27 Oct 2023 08:41:18 -0300 Subject: [PATCH 08/27] ajustar default params --- pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index 8a04f1db3..8fc1bf6b4 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -90,7 +90,7 @@ bilhetagem_ressarcimento_captura = set_default_parameters( flow=bilhetagem_ressarcimento_captura, - default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS, + default_parameters=constants.BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS.value, ) # BILHETAGEM AUXILIAR - SUBFLOW PARA RODAR ANTES DE CADA MATERIALIZAÇÃO # From 59ea93ae9c170c34d34d593489e61e1286dd5c54 Mon Sep 17 00:00:00 2001 From: Rafael Date: Wed, 1 Nov 2023 10:52:36 -0300 Subject: [PATCH 09/27] adicionar pessoa juridica e operadora transporte --- pipelines/rj_smtr/constants.py | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/pipelines/rj_smtr/constants.py b/pipelines/rj_smtr/constants.py index 5ee9f2a1e..fb1651917 100644 --- a/pipelines/rj_smtr/constants.py +++ b/pipelines/rj_smtr/constants.py @@ -344,7 +344,7 @@ class constants(Enum): # pylint: disable=c0103 "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL, }, { - "table_id": "dw_dm_operadora", + "table_id": "operadora_transporte", "partition_date_only": True, "extract_params": { "database": "principal_db", @@ -352,10 +352,28 @@ class constants(Enum): # pylint: disable=c0103 SELECT * FROM - dw_dm_operadora + OPERADORA_TRANSPORTE + WHERE + DT_INCLUSAO BETWEEN '{start}' + AND '{end}' + """, + }, + "primary_key": ["CD_OPERADORA_TRANSPORTE"], # id column to nest data on + "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL, + }, + { + "table_id": "pessoa_juridica", + "partition_date_only": True, + "extract_params": { + "database": "principal_db", + "query": """ + SELECT + * + FROM + PESSOA_JURIDICA """, }, - "primary_key": ["id_operadora"], # id column to nest data on + "primary_key": ["CD_CLIENTE"], # id column to nest data on "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL, }, { From c41ca0f6794e92c54cc053227b2e57cdd6d9c1e2 Mon Sep 17 00:00:00 2001 From: Rafael Date: Wed, 1 Nov 2023 11:29:49 -0300 Subject: [PATCH 10/27] adicionar tabela linha consorcio --- pipelines/rj_smtr/constants.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pipelines/rj_smtr/constants.py b/pipelines/rj_smtr/constants.py index fb1651917..44837863e 100644 --- a/pipelines/rj_smtr/constants.py +++ b/pipelines/rj_smtr/constants.py @@ -394,6 +394,24 @@ class constants(Enum): # pylint: disable=c0103 "primary_key": ["CD_CONSORCIO"], # id column to nest data on "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL, }, + { + "table_id": "linha_consorcio", + "partition_date_only": True, + "extract_params": { + "database": "principal_db", + "query": """ + SELECT + * + FROM + LINHA_CONSORCIO + WHERE + DT_INCLUSAO BETWEEN '{start}' + AND '{end}' + """, + }, + "primary_key": ["CD_CONSORCIO", "CD_LINHA"], # id column to nest data on + "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL, + }, ] BILHETAGEM_MATERIALIZACAO_PARAMS = { From f58147f7e28c26ef735b8ec6deb7848a6f2bfb9d Mon Sep 17 00:00:00 2001 From: Rafael Date: Wed, 1 Nov 2023 17:40:36 -0300 Subject: [PATCH 11/27] =?UTF-8?q?adicionar=20incremental=20na=20captura=20?= =?UTF-8?q?liquida=C3=A7=C3=A3o?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pipelines/rj_smtr/constants.py | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/pipelines/rj_smtr/constants.py b/pipelines/rj_smtr/constants.py index 44837863e..367222c80 100644 --- a/pipelines/rj_smtr/constants.py +++ b/pipelines/rj_smtr/constants.py @@ -230,7 +230,7 @@ class constants(Enum): # pylint: disable=c0103 "interval_minutes": 1, } - BILHETAGEM_RESSARCIMENTO_CAPTURE_PARAMS = [ + BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS = [ { "table_id": "ordem_ressarcimento", "partition_date_only": True, @@ -241,10 +241,13 @@ class constants(Enum): # pylint: disable=c0103 * FROM ordem_ressarcimento + WHERE + data_inclusao BETWEEN '{start}' + AND '{end}' """, }, "primary_key": ["id"], - "interval_minutes": 1, + "interval_minutes": 1440, }, { "table_id": "ordem_pagamento", @@ -256,10 +259,13 @@ class constants(Enum): # pylint: disable=c0103 * FROM ordem_pagamento + WHERE + data_inclusao BETWEEN '{start}' + AND '{end}' """, }, "primary_key": ["id"], - "interval_minutes": 1, + "interval_minutes": 1440, }, ] @@ -414,7 +420,8 @@ class constants(Enum): # pylint: disable=c0103 }, ] - BILHETAGEM_MATERIALIZACAO_PARAMS = { + BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS = { + "dataset_id": BILHETAGEM_DATASET_ID, "table_id": BILHETAGEM_TRANSACAO_CAPTURE_PARAMS["table_id"], "upstream": True, "dbt_vars": { @@ -426,6 +433,20 @@ class constants(Enum): # pylint: disable=c0103 }, } + BILHETAGEM_MATERIALIZACAO_ORDEM_PAGAMENTO_PARAMS = { + "dataset_id": BILHETAGEM_DATASET_ID, + "table_id": "ordem_pagamento_validacao", + "upstream": True, + "exclude": f"+{BILHETAGEM_TRANSACAO_CAPTURE_PARAMS['table_id']}", + "dbt_vars": { + "date_range": { + "table_run_datetime_column_name": "data", + "delay_hours": 0, + }, + "version": {}, + }, + } + BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS = { "dataset_id": BILHETAGEM_DATASET_ID, "secret_path": BILHETAGEM_SECRET_PATH, From dd3e8fecbea1e538a430999094239e4b03c482a4 Mon Sep 17 00:00:00 2001 From: Rafael Date: Wed, 1 Nov 2023 17:49:19 -0300 Subject: [PATCH 12/27] =?UTF-8?q?criar=20flow=20liquida=C3=A7=C3=A3o?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../br_rj_riodejaneiro_bilhetagem/flows.py | 130 +++++++++++++++--- pipelines/rj_smtr/constants.py | 2 +- 2 files changed, 111 insertions(+), 21 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index 8fc1bf6b4..d81a8de0e 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -108,17 +108,36 @@ default_parameters=constants.BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS.value, ) -# MATERIALIZAÇÃO - SUBFLOW DE MATERIALIZAÇÃO # +# MATERIALIZAÇÃO # -bilhetagem_materializacao = deepcopy(default_materialization_flow) -bilhetagem_materializacao.name = "SMTR: Bilhetagem Transação - Materialização (subflow)" -bilhetagem_materializacao.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) -bilhetagem_materializacao.run_config = KubernetesRun( +# Transação +bilhetagem_materializacao_transacao = deepcopy(default_materialization_flow) +bilhetagem_materializacao_transacao.name = ( + "SMTR: Bilhetagem Transação - Materialização (subflow)" +) +bilhetagem_materializacao_transacao.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +bilhetagem_materializacao_transacao.run_config = KubernetesRun( image=emd_constants.DOCKER_IMAGE.value, labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], ) -bilhetagem_materializacao_parameters = { +bilhetagem_materializacao_transacao_parameters = { + "source_table_ids": [ + constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["table_id"] + ] + + [d["table_id"] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value], + "capture_intervals_minutes": [ + constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["interval_minutes"] + ] + + [d["interval_minutes"] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value], +} | constants.BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS.value + +bilhetagem_materializacao_transacao = set_default_parameters( + flow=bilhetagem_materializacao_transacao, + default_parameters=bilhetagem_materializacao_transacao_parameters, +) + +bilhetagem_materializacao_transacao_parameters = { "source_table_ids": [ constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["table_id"] ] @@ -127,14 +146,42 @@ constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["interval_minutes"] ] + [d["interval_minutes"] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value], - "dataset_id": constants.BILHETAGEM_DATASET_ID.value, -} | constants.BILHETAGEM_MATERIALIZACAO_PARAMS.value +} | constants.BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS.value -bilhetagem_materializacao = set_default_parameters( - flow=bilhetagem_materializacao, - default_parameters=bilhetagem_materializacao_parameters, + +# Ordem Pagamento + +bilhetagem_materializacao_ordem_pagamento = deepcopy(default_materialization_flow) +bilhetagem_materializacao_ordem_pagamento.name = ( + "SMTR: Bilhetagem Ordem Pagamento - Materialização (subflow)" +) +bilhetagem_materializacao_ordem_pagamento.storage = GCS( + emd_constants.GCS_FLOWS_BUCKET.value +) +bilhetagem_materializacao_ordem_pagamento.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value], ) +bilhetagem_materializacao_ordem_pagamento_parameters = { + "source_table_ids": [ + constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["table_id"] + ] + + [d["table_id"] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value] + + [ + d["table_id"] for d in constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value + ], + "capture_intervals_minutes": [ + constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["interval_minutes"] + ] + + [d["interval_minutes"] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value] + + [ + d["interval_minutes"] + for d in constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value + ], +} | constants.BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS.value + + # RECAPTURA # bilhetagem_recaptura = deepcopy(default_capture_flow) @@ -239,7 +286,7 @@ with case(materialize, True): # Materialização run_materializacao = create_flow_run( - flow_name=bilhetagem_materializacao.name, + flow_name=bilhetagem_materializacao_transacao.name, project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value, labels=LABELS, upstream_tasks=[ @@ -248,7 +295,9 @@ wait_recaptura_transacao, ], parameters={ - "timestamp": get_current_timestamp(timestamp=timestamp, return_str=True) + "timestamp": get_current_timestamp( + timestamp=timestamp, return_str=True + ), }, ) @@ -307,25 +356,27 @@ bilhetagem_gps_tratamento.schedule = every_hour with Flow( - "SMTR: Bilhetagem Ressarcimento - Captura/Tratamento", + "SMTR: Bilhetagem Ordem Pagamento - Captura/Tratamento", code_owners=["caio", "fernanda", "boris", "rodrigo"], -) as bilhetagem_ressarcimento_captura_tratamento: +) as bilhetagem_ordem_pagamento_captura_tratamento: timestamp = get_rounded_timestamp( interval_minutes=constants.BILHETAGEM_TRATAMENTO_INTERVAL.value ) rename_flow_run = rename_current_flow_run_now_time( - prefix=bilhetagem_ressarcimento_captura_tratamento.name + " ", + prefix=bilhetagem_ordem_pagamento_captura_tratamento.name + " ", now_time=timestamp, ) LABELS = get_current_flow_labels() + # Captura # + runs_captura = create_flow_run.map( - flow_name=unmapped(bilhetagem_ressarcimento_captura.name), + flow_name=unmapped(bilhetagem_ordem_pagamento_captura_tratamento.name), # project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), project_name=unmapped("staging"), - parameters=constants.BILHETAGEM_RESSARCIMENTO_CAPTURE_PARAMS.value, + parameters=constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value, labels=unmapped(LABELS), ) @@ -336,10 +387,49 @@ raise_final_state=unmapped(True), ) -bilhetagem_ressarcimento_captura_tratamento.storage = GCS( + # Recaptura # + + runs_recaptura = create_flow_run.map( + flow_name=unmapped(bilhetagem_recaptura.name), + # project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), + project_name=unmapped("staging"), + parameters=constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value, + labels=unmapped(LABELS), + ) + + runs_recaptura.set_upstream(wait_captura) + + wait_recaptura = wait_for_flow_run.map( + runs_recaptura, + stream_states=unmapped(True), + stream_logs=unmapped(True), + raise_final_state=unmapped(True), + ) + + # Materialização # + + run_materializacao = create_flow_run( + flow_name=bilhetagem_materializacao_ordem_pagamento.name, + # project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value, + project_name="staging", + labels=LABELS, + upstream_tasks=[wait_recaptura], + parameters={ + "timestamp": get_current_timestamp(timestamp=timestamp, return_str=True), + }, + ) + + wait_materializacao = wait_for_flow_run( + run_materializacao, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + +bilhetagem_ordem_pagamento_captura_tratamento.storage = GCS( emd_constants.GCS_FLOWS_BUCKET.value ) -bilhetagem_ressarcimento_captura_tratamento.run_config = KubernetesRun( +bilhetagem_ordem_pagamento_captura_tratamento.run_config = KubernetesRun( image=emd_constants.DOCKER_IMAGE.value, labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], ) diff --git a/pipelines/rj_smtr/constants.py b/pipelines/rj_smtr/constants.py index 367222c80..f8ba4e19f 100644 --- a/pipelines/rj_smtr/constants.py +++ b/pipelines/rj_smtr/constants.py @@ -437,7 +437,7 @@ class constants(Enum): # pylint: disable=c0103 "dataset_id": BILHETAGEM_DATASET_ID, "table_id": "ordem_pagamento_validacao", "upstream": True, - "exclude": f"+{BILHETAGEM_TRANSACAO_CAPTURE_PARAMS['table_id']}", + "exclude": f"+{BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS['table_id']}", "dbt_vars": { "date_range": { "table_run_datetime_column_name": "data", From 370acba6ceb5c6e2f55e431ba9e75fd70a2f2f6a Mon Sep 17 00:00:00 2001 From: Rafael Date: Wed, 1 Nov 2023 18:04:23 -0300 Subject: [PATCH 13/27] adicionar parametros de controle de run --- .../br_rj_riodejaneiro_bilhetagem/flows.py | 101 ++++++++++-------- 1 file changed, 57 insertions(+), 44 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index d81a8de0e..5b9984143 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -359,6 +359,9 @@ "SMTR: Bilhetagem Ordem Pagamento - Captura/Tratamento", code_owners=["caio", "fernanda", "boris", "rodrigo"], ) as bilhetagem_ordem_pagamento_captura_tratamento: + capture = Parameter("capture", default=True) + materialize = Parameter("materialize", default=True) + timestamp = get_rounded_timestamp( interval_minutes=constants.BILHETAGEM_TRATAMENTO_INTERVAL.value ) @@ -371,60 +374,70 @@ LABELS = get_current_flow_labels() # Captura # + with case(capture, True): + runs_captura = create_flow_run.map( + flow_name=unmapped(bilhetagem_ordem_pagamento_captura_tratamento.name), + # project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), + project_name=unmapped("staging"), + parameters=constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value, + labels=unmapped(LABELS), + ) - runs_captura = create_flow_run.map( - flow_name=unmapped(bilhetagem_ordem_pagamento_captura_tratamento.name), - # project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), - project_name=unmapped("staging"), - parameters=constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value, - labels=unmapped(LABELS), - ) + wait_captura = wait_for_flow_run.map( + runs_captura, + stream_states=unmapped(True), + stream_logs=unmapped(True), + raise_final_state=unmapped(True), + ) - wait_captura = wait_for_flow_run.map( - runs_captura, - stream_states=unmapped(True), - stream_logs=unmapped(True), - raise_final_state=unmapped(True), - ) + # Recaptura # - # Recaptura # + runs_recaptura = create_flow_run.map( + flow_name=unmapped(bilhetagem_recaptura.name), + # project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), + project_name=unmapped("staging"), + parameters=constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value, + labels=unmapped(LABELS), + ) - runs_recaptura = create_flow_run.map( - flow_name=unmapped(bilhetagem_recaptura.name), - # project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), - project_name=unmapped("staging"), - parameters=constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value, - labels=unmapped(LABELS), - ) + runs_recaptura.set_upstream(wait_captura) - runs_recaptura.set_upstream(wait_captura) + wait_recaptura_true = wait_for_flow_run.map( + runs_recaptura, + stream_states=unmapped(True), + stream_logs=unmapped(True), + raise_final_state=unmapped(True), + ) - wait_recaptura = wait_for_flow_run.map( - runs_recaptura, - stream_states=unmapped(True), - stream_logs=unmapped(True), - raise_final_state=unmapped(True), - ) + with case(capture, False): + wait_recaptura_false = task( + lambda: None, name="assign_none_to_recapture", nout=3 + )() + + wait_recaptura = merge(wait_recaptura_true, wait_recaptura_false) # Materialização # - run_materializacao = create_flow_run( - flow_name=bilhetagem_materializacao_ordem_pagamento.name, - # project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value, - project_name="staging", - labels=LABELS, - upstream_tasks=[wait_recaptura], - parameters={ - "timestamp": get_current_timestamp(timestamp=timestamp, return_str=True), - }, - ) + with case(materialize, True): + run_materializacao = create_flow_run( + flow_name=bilhetagem_materializacao_ordem_pagamento.name, + # project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value, + project_name="staging", + labels=LABELS, + upstream_tasks=[wait_recaptura], + parameters={ + "timestamp": get_current_timestamp( + timestamp=timestamp, return_str=True + ), + }, + ) - wait_materializacao = wait_for_flow_run( - run_materializacao, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) + wait_materializacao = wait_for_flow_run( + run_materializacao, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) bilhetagem_ordem_pagamento_captura_tratamento.storage = GCS( emd_constants.GCS_FLOWS_BUCKET.value From 1c68bff7a65e55b80b04a0523b0792b64548001e Mon Sep 17 00:00:00 2001 From: Rafael Date: Wed, 1 Nov 2023 18:05:08 -0300 Subject: [PATCH 14/27] comentar source tables para teste --- .../br_rj_riodejaneiro_bilhetagem/flows.py | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index 5b9984143..62a2739dc 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -164,21 +164,21 @@ ) bilhetagem_materializacao_ordem_pagamento_parameters = { - "source_table_ids": [ - constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["table_id"] - ] - + [d["table_id"] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value] - + [ - d["table_id"] for d in constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value - ], - "capture_intervals_minutes": [ - constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["interval_minutes"] - ] - + [d["interval_minutes"] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value] - + [ - d["interval_minutes"] - for d in constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value - ], + # "source_table_ids": [ + # constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["table_id"] + # ] + # + [d["table_id"] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value] + # + [ + # d["table_id"] for d in constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value + # ], + # "capture_intervals_minutes": [ + # constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["interval_minutes"] + # ] + # + [d["interval_minutes"] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value] + # + [ + # d["interval_minutes"] + # for d in constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value + # ], } | constants.BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS.value From cd7781c1707858a365f304d854443c570baecf4e Mon Sep 17 00:00:00 2001 From: Rafael Date: Wed, 1 Nov 2023 18:16:32 -0300 Subject: [PATCH 15/27] alterar agent para dev --- pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index 62a2739dc..5ae6ba6f3 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -444,5 +444,5 @@ ) bilhetagem_ordem_pagamento_captura_tratamento.run_config = KubernetesRun( image=emd_constants.DOCKER_IMAGE.value, - labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], + labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value], ) From 4eddb6e337a17ffde1543680d7d74a31da9903e1 Mon Sep 17 00:00:00 2001 From: Rafael Date: Wed, 1 Nov 2023 19:03:44 -0300 Subject: [PATCH 16/27] =?UTF-8?q?adicionar=20parametros=20default=20materi?= =?UTF-8?q?aliza=C3=A7=C3=A3o?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../br_rj_riodejaneiro_bilhetagem/flows.py | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index 5ae6ba6f3..57935cf3d 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -137,18 +137,6 @@ default_parameters=bilhetagem_materializacao_transacao_parameters, ) -bilhetagem_materializacao_transacao_parameters = { - "source_table_ids": [ - constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["table_id"] - ] - + [d["table_id"] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value], - "capture_intervals_minutes": [ - constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["interval_minutes"] - ] - + [d["interval_minutes"] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value], -} | constants.BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS.value - - # Ordem Pagamento bilhetagem_materializacao_ordem_pagamento = deepcopy(default_materialization_flow) @@ -179,7 +167,12 @@ # d["interval_minutes"] # for d in constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value # ], -} | constants.BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS.value +} | constants.BILHETAGEM_MATERIALIZACAO_ORDEM_PAGAMENTO_PARAMS.value + +bilhetagem_materializacao_ordem_pagamento = set_default_parameters( + flow=bilhetagem_materializacao_ordem_pagamento, + default_parameters=bilhetagem_materializacao_ordem_pagamento_parameters, +) # RECAPTURA # From f18547771f01e976849eba3b420f0763ce6ff30f Mon Sep 17 00:00:00 2001 From: Rafael Date: Wed, 1 Nov 2023 19:34:31 -0300 Subject: [PATCH 17/27] usar campo de data no daterange --- pipelines/rj_smtr/tasks.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index f927a02f9..21e71d5b5 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -3,7 +3,7 @@ """ Tasks for rj_smtr """ -from datetime import datetime, timedelta +from datetime import datetime, timedelta, date import json import os from pathlib import Path @@ -1087,6 +1087,9 @@ def get_materialization_date_range( # pylint: disable=R0913 else: last_run = datetime.strptime(last_run, timestr) + if isinstance(last_run, date): + last_run = datetime(last_run.year, last_run.month, last_run.day) + # set start to last run hour (H) start_ts = last_run.replace(minute=0, second=0, microsecond=0).strftime(timestr) From 4a22f37580c9f57dd9e819757dd8946269121c2c Mon Sep 17 00:00:00 2001 From: Rafael Date: Mon, 6 Nov 2023 09:25:40 -0300 Subject: [PATCH 18/27] descomentar parametros teste --- .../br_rj_riodejaneiro_bilhetagem/flows.py | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index 57935cf3d..cee1665d6 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -152,21 +152,21 @@ ) bilhetagem_materializacao_ordem_pagamento_parameters = { - # "source_table_ids": [ - # constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["table_id"] - # ] - # + [d["table_id"] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value] - # + [ - # d["table_id"] for d in constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value - # ], - # "capture_intervals_minutes": [ - # constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["interval_minutes"] - # ] - # + [d["interval_minutes"] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value] - # + [ - # d["interval_minutes"] - # for d in constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value - # ], + "source_table_ids": [ + constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["table_id"] + ] + + [d["table_id"] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value] + + [ + d["table_id"] for d in constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value + ], + "capture_intervals_minutes": [ + constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["interval_minutes"] + ] + + [d["interval_minutes"] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value] + + [ + d["interval_minutes"] + for d in constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value + ], } | constants.BILHETAGEM_MATERIALIZACAO_ORDEM_PAGAMENTO_PARAMS.value bilhetagem_materializacao_ordem_pagamento = set_default_parameters( From feb0856bfcd634ae12f496f545a7a623c14628b8 Mon Sep 17 00:00:00 2001 From: Rafael Date: Mon, 6 Nov 2023 09:26:27 -0300 Subject: [PATCH 19/27] corrigir nout --- pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index cee1665d6..04a5f638c 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -403,9 +403,7 @@ ) with case(capture, False): - wait_recaptura_false = task( - lambda: None, name="assign_none_to_recapture", nout=3 - )() + wait_recaptura_false = task(lambda: None, name="assign_none_to_recapture")() wait_recaptura = merge(wait_recaptura_true, wait_recaptura_false) From 40da6644f0452766a3bf515424a52624572f0f73 Mon Sep 17 00:00:00 2001 From: Rafael Date: Mon, 6 Nov 2023 11:06:00 -0300 Subject: [PATCH 20/27] adicionar log para testes --- pipelines/rj_smtr/tasks.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index 21e71d5b5..9715d5d75 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -659,6 +659,8 @@ def create_request_params( timestamp=timestamp, interval=timedelta(minutes=interval_minutes) ) + log(f"datetime_range = {datetime_range}") + request_params = { "database": extract_params["database"], "engine": database["engine"], From ab0bc7a447129c39b314113c813164fa0845106b Mon Sep 17 00:00:00 2001 From: Rafael Date: Mon, 6 Nov 2023 11:36:14 -0300 Subject: [PATCH 21/27] corrigir nome subflow --- pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index 04a5f638c..ce4dd3587 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -229,7 +229,8 @@ runs_captura = create_flow_run.map( flow_name=unmapped(bilhetagem_auxiliar_captura.name), - project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), + # project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), + project_name=unmapped("staging"), parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value, labels=unmapped(LABELS), ) @@ -245,7 +246,8 @@ runs_recaptura_auxiliar = create_flow_run.map( flow_name=unmapped(bilhetagem_recaptura.name), - project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), + # project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), + project_name=unmapped("staging"), parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value, labels=unmapped(LABELS), ) @@ -369,7 +371,7 @@ # Captura # with case(capture, True): runs_captura = create_flow_run.map( - flow_name=unmapped(bilhetagem_ordem_pagamento_captura_tratamento.name), + flow_name=unmapped(bilhetagem_ressarcimento_captura.name), # project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), project_name=unmapped("staging"), parameters=constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value, From 32a3b52feca6f85a0711e7913c191d5f0c7412d5 Mon Sep 17 00:00:00 2001 From: Rafael Date: Tue, 7 Nov 2023 16:25:23 -0300 Subject: [PATCH 22/27] adicionar schedule e reference tasks --- pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index ce4dd3587..eb4ef398f 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -36,7 +36,7 @@ from pipelines.rj_smtr.constants import constants -from pipelines.rj_smtr.schedules import every_hour, every_minute +from pipelines.rj_smtr.schedules import every_hour, every_minute, every_day # Flows # @@ -432,6 +432,10 @@ raise_final_state=True, ) + bilhetagem_ordem_pagamento_captura_tratamento.reference_tasks( + [wait_materializacao, wait_recaptura] + ) + bilhetagem_ordem_pagamento_captura_tratamento.storage = GCS( emd_constants.GCS_FLOWS_BUCKET.value ) @@ -439,3 +443,6 @@ image=emd_constants.DOCKER_IMAGE.value, labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value], ) + + +bilhetagem_ordem_pagamento_captura_tratamento.schedule = every_day From 9dbfb15c54de2f3570ebfe478ac4c3bc512d44f4 Mon Sep 17 00:00:00 2001 From: Rafael Date: Tue, 7 Nov 2023 16:45:26 -0300 Subject: [PATCH 23/27] =?UTF-8?q?ajuste=20fun=C3=A7=C3=A3o=20reference=20t?= =?UTF-8?q?ask?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index eb4ef398f..9bf519c3c 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -432,7 +432,7 @@ raise_final_state=True, ) - bilhetagem_ordem_pagamento_captura_tratamento.reference_tasks( + bilhetagem_ordem_pagamento_captura_tratamento.set_reference_tasks( [wait_materializacao, wait_recaptura] ) From 21b5f6f0fb390b39a5478fde9e727a427dc06d23 Mon Sep 17 00:00:00 2001 From: Rafael Date: Tue, 7 Nov 2023 17:56:07 -0300 Subject: [PATCH 24/27] mudar ambiente para prd --- .../br_rj_riodejaneiro_bilhetagem/flows.py | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index 9bf519c3c..6fe4a20c7 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -148,7 +148,7 @@ ) bilhetagem_materializacao_ordem_pagamento.run_config = KubernetesRun( image=emd_constants.DOCKER_IMAGE.value, - labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value], + labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], ) bilhetagem_materializacao_ordem_pagamento_parameters = { @@ -190,7 +190,7 @@ with Flow( "SMTR: Bilhetagem Transação - Tratamento", - code_owners=["caio", "fernanda", "boris", "rodrigo"], + code_owners=["caio", "fernanda", "boris", "rodrigo", "rafaelpinheiro"], ) as bilhetagem_transacao_tratamento: # Configuração # @@ -229,8 +229,7 @@ runs_captura = create_flow_run.map( flow_name=unmapped(bilhetagem_auxiliar_captura.name), - # project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), - project_name=unmapped("staging"), + project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value, labels=unmapped(LABELS), ) @@ -246,8 +245,7 @@ runs_recaptura_auxiliar = create_flow_run.map( flow_name=unmapped(bilhetagem_recaptura.name), - # project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), - project_name=unmapped("staging"), + project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value, labels=unmapped(LABELS), ) @@ -313,7 +311,7 @@ with Flow( "SMTR: Bilhetagem GPS Validador - Tratamento", - code_owners=["caio", "fernanda", "boris", "rodrigo"], + code_owners=["caio", "fernanda", "boris", "rodrigo", "rafaelpinheiro"], ) as bilhetagem_gps_tratamento: timestamp = get_rounded_timestamp( interval_minutes=constants.BILHETAGEM_TRATAMENTO_INTERVAL.value @@ -352,7 +350,7 @@ with Flow( "SMTR: Bilhetagem Ordem Pagamento - Captura/Tratamento", - code_owners=["caio", "fernanda", "boris", "rodrigo"], + code_owners=["caio", "fernanda", "boris", "rodrigo", "rafaelpinheiro"], ) as bilhetagem_ordem_pagamento_captura_tratamento: capture = Parameter("capture", default=True) materialize = Parameter("materialize", default=True) @@ -372,8 +370,7 @@ with case(capture, True): runs_captura = create_flow_run.map( flow_name=unmapped(bilhetagem_ressarcimento_captura.name), - # project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), - project_name=unmapped("staging"), + project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), parameters=constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value, labels=unmapped(LABELS), ) @@ -389,8 +386,7 @@ runs_recaptura = create_flow_run.map( flow_name=unmapped(bilhetagem_recaptura.name), - # project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), - project_name=unmapped("staging"), + project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), parameters=constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value, labels=unmapped(LABELS), ) @@ -441,7 +437,7 @@ ) bilhetagem_ordem_pagamento_captura_tratamento.run_config = KubernetesRun( image=emd_constants.DOCKER_IMAGE.value, - labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value], + labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], ) From 9a1334e165b20cc330c6801367a38202241336c5 Mon Sep 17 00:00:00 2001 From: Rafael Date: Wed, 8 Nov 2023 15:01:10 -0300 Subject: [PATCH 25/27] alterar query logs para escritas concorrentes --- pipelines/rj_smtr/tasks.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index 9715d5d75..5f431dc59 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -10,6 +10,7 @@ import traceback from typing import Dict, List, Union, Iterable, Any import io +import time from basedosdados import Storage, Table import basedosdados as bd @@ -19,6 +20,7 @@ from prefect import task from pytz import timezone import requests +from google.api_core.exceptions import NotFound from pipelines.rj_smtr.constants import constants from pipelines.rj_smtr.utils import ( @@ -527,7 +529,16 @@ def query_logs( logs.sucesso IS NOT TRUE """ log(f"Run query to check logs:\n{query}") - results = bd.read_sql(query=query, billing_project_id=bq_project()) + retries = 5 + for i in range(retries): + try: + results = bd.read_sql(query=query, billing_project_id=bq_project()) + break + except NotFound as e: + log(e) + if i == retries - 1: + raise e + time.sleep(5) if len(results) > 0: results = results.sort_values(["timestamp_captura"]) results["timestamp_captura"] = ( @@ -659,8 +670,6 @@ def create_request_params( timestamp=timestamp, interval=timedelta(minutes=interval_minutes) ) - log(f"datetime_range = {datetime_range}") - request_params = { "database": extract_params["database"], "engine": database["engine"], From 6d7b156beca4904395fd1cdd2a125bf6a7888f76 Mon Sep 17 00:00:00 2001 From: Rafael Date: Wed, 8 Nov 2023 16:33:19 -0300 Subject: [PATCH 26/27] =?UTF-8?q?altera=C3=A7=C3=B5es=20review?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py | 7 +++---- pipelines/rj_smtr/tasks.py | 14 +++----------- 2 files changed, 6 insertions(+), 15 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index 6fe4a20c7..b8fdc4f3d 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -36,7 +36,7 @@ from pipelines.rj_smtr.constants import constants -from pipelines.rj_smtr.schedules import every_hour, every_minute, every_day +from pipelines.rj_smtr.schedules import every_hour, every_minute, every_day_hour_five # Flows # @@ -410,8 +410,7 @@ with case(materialize, True): run_materializacao = create_flow_run( flow_name=bilhetagem_materializacao_ordem_pagamento.name, - # project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value, - project_name="staging", + project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value, labels=LABELS, upstream_tasks=[wait_recaptura], parameters={ @@ -441,4 +440,4 @@ ) -bilhetagem_ordem_pagamento_captura_tratamento.schedule = every_day +bilhetagem_ordem_pagamento_captura_tratamento.schedule = every_day_hour_five diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index 5f431dc59..1d0601a0a 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -434,7 +434,7 @@ def save_treated_local(file_path: str, status: dict, mode: str = "staging") -> s # Extract data # ############### -@task(nout=3) +@task(nout=3, max_retries=3, retry_delay=timedelta(seconds=5)) def query_logs( dataset_id: str, table_id: str, @@ -529,16 +529,8 @@ def query_logs( logs.sucesso IS NOT TRUE """ log(f"Run query to check logs:\n{query}") - retries = 5 - for i in range(retries): - try: - results = bd.read_sql(query=query, billing_project_id=bq_project()) - break - except NotFound as e: - log(e) - if i == retries - 1: - raise e - time.sleep(5) + results = bd.read_sql(query=query, billing_project_id=bq_project()) + if len(results) > 0: results = results.sort_values(["timestamp_captura"]) results["timestamp_captura"] = ( From 7a3ad6ebd787c6422c147f30ce0ed42296e2b248 Mon Sep 17 00:00:00 2001 From: Rafael Date: Wed, 8 Nov 2023 16:47:45 -0300 Subject: [PATCH 27/27] =?UTF-8?q?remover=20imports=20n=C3=A3o=20utilizados?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pipelines/rj_smtr/tasks.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index 1d0601a0a..fbf86fe3e 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -10,7 +10,6 @@ import traceback from typing import Dict, List, Union, Iterable, Any import io -import time from basedosdados import Storage, Table import basedosdados as bd @@ -20,7 +19,6 @@ from prefect import task from pytz import timezone import requests -from google.api_core.exceptions import NotFound from pipelines.rj_smtr.constants import constants from pipelines.rj_smtr.utils import (