From 34e2f27d826e9020d2bc863dc22a39db39eeace5 Mon Sep 17 00:00:00 2001 From: Rodrigo Cunha <66736583+eng-rodrigocunha@users.noreply.github.com> Date: Tue, 30 Apr 2024 07:57:23 -0300 Subject: [PATCH] Tratamento GPS SPPO (#674) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * commit inicial * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * corrige clean_br_rj_riodejaneiro_onibus_gps * corrige log clean_br_rj_riodejaneiro_onibus_gps * atualiza changelog * enriquece log * Corrige queries * atualiza tasks de referência * Altera agentes para produção * altera nomes de variaveis * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Atualiza docstring * refactor materialize_sppo * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Rafael Carvalho Pinheiro <74972217+pixuimpou@users.noreply.github.com> --- .../br_rj_riodejaneiro_onibus_gps/flows.py | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py index 9c02f912f..3f732f265 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py @@ -171,7 +171,7 @@ # Set specific run parameters # with case(rematerialization, False): - rematerialization_dates_false = date_range = get_materialization_date_range( + date_range_false = get_materialization_date_range( dataset_id=dataset_id, table_id=table_id, raw_dataset_id=raw_dataset_id, @@ -180,20 +180,26 @@ mode=MODE, delay_hours=constants.GPS_SPPO_MATERIALIZE_DELAY_HOURS.value, ) + + RUN_CLEAN_FALSE = task( + lambda: [None], + checkpoint=False, + name="assign_none_to_previous_runs", + )() with case(rematerialization, True): - date_range = { + date_range_true = { "date_range_start": date_range_start, "date_range_end": date_range_end, } - rematerialization_dates_true = clean_br_rj_riodejaneiro_onibus_gps(date_range) + RUN_CLEAN_TRUE = clean_br_rj_riodejaneiro_onibus_gps(date_range_true) - rematerialization_dates = merge( - rematerialization_dates_true, rematerialization_dates_false - ) + RUN_CLEAN = merge(RUN_CLEAN_TRUE, RUN_CLEAN_FALSE) + + date_range = merge(date_range_true, date_range_false) dataset_sha = fetch_dataset_sha( dataset_id=dataset_id, - upstream_tasks=[rematerialization_dates], + upstream_tasks=[RUN_CLEAN], ) # Run materialization # @@ -238,7 +244,7 @@ SET = merge(SET_TRUE, SET_FALSE) - materialize_sppo.set_reference_tasks([RUN, rematerialization_dates, SET]) + materialize_sppo.set_reference_tasks([RUN, RUN_CLEAN, SET]) materialize_sppo.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) materialize_sppo.run_config = KubernetesRun(