From 0fe7d64c6afbdc37fefe60dcdc409c5706095565 Mon Sep 17 00:00:00 2001 From: Rafael Carvalho Pinheiro <74972217+pixuimpou@users.noreply.github.com> Date: Fri, 27 Oct 2023 17:07:55 -0300 Subject: [PATCH] Padroniza nomes dos flows (#526) * altera nomes gps sppo * alterar nomes * corrigir rename run * retirar variavel de nome * ajusta nome de flows * corrige nome sppo_rho_materialize --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: eng-rodrigocunha --- .../rj_smtr/br_rj_riodejaneiro_brt_gps/flows.py | 4 ++-- .../rj_smtr/br_rj_riodejaneiro_gtfs/flows.py | 4 ++-- .../br_rj_riodejaneiro_onibus_gps/flows.py | 14 +++++++------- pipelines/rj_smtr/br_rj_riodejaneiro_rdo/flows.py | 10 ++++++---- .../rj_smtr/br_rj_riodejaneiro_stpl_gps/flows.py | 2 +- pipelines/rj_smtr/projeto_subsidio_sppo/flows.py | 9 ++++----- pipelines/rj_smtr/registros_ocr_rir/flows.py | 4 ++-- pipelines/rj_smtr/veiculo/flows.py | 15 ++++++--------- 8 files changed, 30 insertions(+), 32 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_brt_gps/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_brt_gps/flows.py index 006a81cc4..7bc881258 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_brt_gps/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_brt_gps/flows.py @@ -56,7 +56,7 @@ ) as materialize_brt: # Rename flow run rename_flow_run = rename_current_flow_run_now_time( - prefix="GPS BRT - Materialização: ", now_time=get_now_time() + prefix=materialize_brt.name + ": ", now_time=get_now_time() ) # Get default parameters # @@ -143,7 +143,7 @@ # Rename flow run rename_flow_run = rename_current_flow_run_now_time( - prefix="SMTR: GPS BRT - Captura - ", now_time=timestamp + prefix=captura_brt.name + ": ", now_time=timestamp ) # SETUP LOCAL # diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_gtfs/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_gtfs/flows.py index bd60dc963..97a3280b2 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_gtfs/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_gtfs/flows.py @@ -35,7 +35,7 @@ # SETUP dos Flows gtfs_captura = deepcopy(default_capture_flow) -gtfs_captura.name = "SMTR - Captura dos dados do GTFS" +gtfs_captura.name = "SMTR: GTFS - Captura (subflow)" gtfs_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) gtfs_captura.run_config = KubernetesRun( image=emd_constants.DOCKER_IMAGE.value, @@ -48,7 +48,7 @@ ) gtfs_materializacao = deepcopy(default_materialization_flow) -gtfs_materializacao.name = "SMTR - Materialização dos dados do GTFS" +gtfs_materializacao.name = "SMTR: GTFS - Materialização (subflow)" gtfs_materializacao.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) gtfs_materializacao.run_config = KubernetesRun( image=emd_constants.DOCKER_IMAGE.value, diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py index 1b11e5dd0..395815137 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py @@ -59,7 +59,7 @@ # Flows # with Flow( - "SMTR: GPS SPPO - Realocação (captura)", + "SMTR: GPS SPPO Realocação - Captura", code_owners=["caio", "fernanda", "boris", "rodrigo"], ) as realocacao_sppo: # SETUP # @@ -81,7 +81,7 @@ timestamp = get_current_timestamp() rename_flow_run = rename_current_flow_run_now_time( - prefix="GPS SPPO - Realocação: ", now_time=timestamp + prefix=realocacao_sppo.name + ": ", now_time=timestamp ) partitions = create_date_hour_partition(timestamp) @@ -135,12 +135,12 @@ with Flow( - "SMTR: GPS SPPO - Materialização", + "SMTR: GPS SPPO - Materialização (subflow)", code_owners=["caio", "fernanda", "boris", "rodrigo"], ) as materialize_sppo: # Rename flow run rename_flow_run = rename_current_flow_run_now_time( - prefix="SMTR: GPS SPPO - Materialização - ", now_time=get_now_time() + prefix=materialize_sppo.name + ": ", now_time=get_now_time() ) # Get default parameters # @@ -228,7 +228,7 @@ timestamp = get_current_timestamp() rename_flow_run = rename_current_flow_run_now_time( - prefix="GPS SPPO: ", now_time=timestamp + prefix=captura_sppo_v2.name + ": ", now_time=timestamp ) partitions = create_date_hour_partition(timestamp) @@ -282,7 +282,7 @@ with Flow( - "SMTR - GPS SPPO Recapturas", code_owners=["caio", "fernanda", "boris", "rodrigo"] + "SMTR: GPS SPPO - Tratamento", code_owners=["caio", "fernanda", "boris", "rodrigo"] ) as recaptura: version = Parameter("version", default=2) datetime_filter = Parameter("datetime_filter", default=None) @@ -297,7 +297,7 @@ ) rename_flow_run = rename_current_flow_run_now_time( - prefix="GPS SPPO Recapturas: ", now_time=get_now_time(), wait=timestamps + prefix=recaptura.name + ": ", now_time=get_now_time(), wait=timestamps ) with case(errors, False): with case(materialize, True): diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_rdo/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_rdo/flows.py index efd84e49c..c6a4a09eb 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_rdo/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_rdo/flows.py @@ -37,10 +37,12 @@ ) from pipelines.utils.execute_dbt_model.tasks import run_dbt_model -with Flow("SMTR: SPPO RHO - Materialização") as sppo_rho_materialize: +with Flow( + "SMTR: SPPO RHO - Materialização", code_owners=["rodrigo"] +) as sppo_rho_materialize: # Rename flow run rename_flow_run = rename_current_flow_run_now_time( - prefix="SPPO RHO - Materialização: ", now_time=get_now_time() + prefix=sppo_rho_materialize.name + ": ", now_time=get_now_time() ) # Get default parameters # @@ -108,7 +110,7 @@ materialize = Parameter("materialize", False) rename_run = rename_current_flow_run_now_time( - prefix=f"Captura FTP - {transport_mode.run()}-{report_type.run()} ", + prefix=f"{captura_sppo_rho.name} FTP - {transport_mode.run()}-{report_type.run()} ", now_time=get_current_timestamp(), wait=None, ) @@ -156,7 +158,7 @@ materialize = Parameter("materialize", False) rename_run = rename_current_flow_run_now_time( - prefix=f"Captura FTP - {transport_mode.run()}-{report_type.run()} ", + prefix=f"{captura_sppo_rdo.name} FTP - {transport_mode.run()}-{report_type.run()} ", now_time=get_current_timestamp(), wait=None, ) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_stpl_gps/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_stpl_gps/flows.py index 7d8cf1574..9536a97ae 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_stpl_gps/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_stpl_gps/flows.py @@ -56,7 +56,7 @@ timestamp = get_current_timestamp() rename_flow_run = rename_current_flow_run_now_time( - prefix="SMTR: GPS STPL - Captura - ", now_time=timestamp + prefix=captura_stpl.name + " - ", now_time=timestamp ) partitions = create_date_hour_partition(timestamp) diff --git a/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py b/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py index d40a1d3e1..03a2a96e3 100644 --- a/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py +++ b/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py @@ -52,7 +52,7 @@ # Flows # with Flow( - "SMTR: Viagens SPPO", + "SMTR: Viagens SPPO - Materialização", code_owners=["caio", "fernanda", "boris", "rodrigo"], ) as viagens_sppo: # Rename flow run @@ -65,7 +65,7 @@ run_dates = get_run_dates(date_range_start, date_range_end) rename_flow_run = rename_current_flow_run_now_time( - prefix="SMTR - Viagens SPPO: ", now_time=run_dates + prefix=viagens_sppo.name + ": ", now_time=run_dates ) LABELS = get_current_flow_labels() @@ -98,9 +98,8 @@ viagens_sppo.schedule = every_day_hour_five -SUBSIDIO_SPPO_APURACAO_NAME = "SMTR: Subsídio SPPO Apuração" with Flow( - SUBSIDIO_SPPO_APURACAO_NAME, + "SMTR: Subsídio SPPO Apuração - Materialização", code_owners=["rodrigo"], ) as subsidio_sppo_apuracao: # 1. SETUP # @@ -137,7 +136,7 @@ # Rename flow run # rename_flow_run = rename_current_flow_run_now_time( - prefix=SUBSIDIO_SPPO_APURACAO_NAME + ": ", now_time=run_dates + prefix=subsidio_sppo_apuracao.name + ": ", now_time=run_dates ) # Set dbt client # diff --git a/pipelines/rj_smtr/registros_ocr_rir/flows.py b/pipelines/rj_smtr/registros_ocr_rir/flows.py index 26363916d..c1bd7b64f 100644 --- a/pipelines/rj_smtr/registros_ocr_rir/flows.py +++ b/pipelines/rj_smtr/registros_ocr_rir/flows.py @@ -21,7 +21,7 @@ from pipelines.utils.tasks import rename_current_flow_run_now_time with Flow( - "SMTR - Captura FTP - OCR RIR", + "SMTR: OCR RIR - Captura", code_owners=[ "caio", "fernanda", @@ -33,7 +33,7 @@ dump = Parameter("dump", default=False) execution_time = Parameter("execution_time", default=None) rename_flow = rename_current_flow_run_now_time( - prefix="OCR RIR: ", now_time=get_current_timestamp() + prefix=captura_ocr.name + " ", now_time=get_current_timestamp() ) # Pipeline status = get_files_from_ftp( diff --git a/pipelines/rj_smtr/veiculo/flows.py b/pipelines/rj_smtr/veiculo/flows.py index e1fab515e..b25c9c38a 100644 --- a/pipelines/rj_smtr/veiculo/flows.py +++ b/pipelines/rj_smtr/veiculo/flows.py @@ -55,9 +55,8 @@ # Flows # # flake8: noqa: E501 -sppo_licenciamento_captura_name = f"SMTR: Captura - {constants.DATASET_ID.value}.{constants.SPPO_LICENCIAMENTO_TABLE_ID.value}" with Flow( - sppo_licenciamento_captura_name, + f"SMTR: {constants.DATASET_ID.value} {constants.SPPO_LICENCIAMENTO_TABLE_ID.value} - Captura", code_owners=["caio", "fernanda", "boris", "rodrigo"], ) as sppo_licenciamento_captura: timestamp = get_current_timestamp() @@ -67,7 +66,7 @@ # Rename flow run rename_flow_run = rename_current_flow_run_now_time( - prefix=f"{sppo_licenciamento_captura_name} - ", now_time=timestamp + prefix=f"{sppo_licenciamento_captura.name} - ", now_time=timestamp ) # SETUP # @@ -124,9 +123,8 @@ ) sppo_licenciamento_captura.schedule = every_day_hour_seven -sppo_infracao_captura_name = f"SMTR: Captura - {constants.DATASET_ID.value}.{constants.SPPO_INFRACAO_TABLE_ID.value}" with Flow( - sppo_infracao_captura_name, + f"SMTR: {constants.DATASET_ID.value} {constants.SPPO_INFRACAO_TABLE_ID.value} - Captura", code_owners=["caio", "fernanda", "boris", "rodrigo"], ) as sppo_infracao_captura: timestamp = get_current_timestamp() @@ -136,7 +134,7 @@ # Rename flow run rename_flow_run = rename_current_flow_run_now_time( - prefix=f"{sppo_infracao_captura_name} - ", now_time=timestamp + prefix=f"{sppo_infracao_captura.name} - ", now_time=timestamp ) # SETUP # @@ -192,9 +190,8 @@ sppo_infracao_captura.schedule = every_day_hour_seven # flake8: noqa: E501 -sppo_veiculo_dia_name = f"SMTR: Materialização - {constants.DATASET_ID.value}.{constants.SPPO_VEICULO_DIA_TABLE_ID.value}" with Flow( - sppo_veiculo_dia_name, + f"SMTR: {constants.DATASET_ID.value} {constants.SPPO_VEICULO_DIA_TABLE_ID.value} - Materialização (subflow)", code_owners=["caio", "fernanda", "boris", "rodrigo"], ) as sppo_veiculo_dia: # 1. SETUP # @@ -208,7 +205,7 @@ # Rename flow run # rename_flow_run = rename_current_flow_run_now_time( - prefix=sppo_veiculo_dia_name + ": ", + prefix=sppo_veiculo_dia.name + ": ", now_time=run_dates, )