diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py index 9c02f912f..3f732f265 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py @@ -171,7 +171,7 @@ # Set specific run parameters # with case(rematerialization, False): - rematerialization_dates_false = date_range = get_materialization_date_range( + date_range_false = get_materialization_date_range( dataset_id=dataset_id, table_id=table_id, raw_dataset_id=raw_dataset_id, @@ -180,20 +180,26 @@ mode=MODE, delay_hours=constants.GPS_SPPO_MATERIALIZE_DELAY_HOURS.value, ) + + RUN_CLEAN_FALSE = task( + lambda: [None], + checkpoint=False, + name="assign_none_to_previous_runs", + )() with case(rematerialization, True): - date_range = { + date_range_true = { "date_range_start": date_range_start, "date_range_end": date_range_end, } - rematerialization_dates_true = clean_br_rj_riodejaneiro_onibus_gps(date_range) + RUN_CLEAN_TRUE = clean_br_rj_riodejaneiro_onibus_gps(date_range_true) - rematerialization_dates = merge( - rematerialization_dates_true, rematerialization_dates_false - ) + RUN_CLEAN = merge(RUN_CLEAN_TRUE, RUN_CLEAN_FALSE) + + date_range = merge(date_range_true, date_range_false) dataset_sha = fetch_dataset_sha( dataset_id=dataset_id, - upstream_tasks=[rematerialization_dates], + upstream_tasks=[RUN_CLEAN], ) # Run materialization # @@ -238,7 +244,7 @@ SET = merge(SET_TRUE, SET_FALSE) - materialize_sppo.set_reference_tasks([RUN, rematerialization_dates, SET]) + materialize_sppo.set_reference_tasks([RUN, RUN_CLEAN, SET]) materialize_sppo.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) materialize_sppo.run_config = KubernetesRun(