Skip to content

Commit

Permalink
Merge branch 'master' into staging/sme_frequencia
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Apr 30, 2024
2 parents aeead6b + 34e2f27 commit ac288c9
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@

# Set specific run parameters #
with case(rematerialization, False):
rematerialization_dates_false = date_range = get_materialization_date_range(
date_range_false = get_materialization_date_range(
dataset_id=dataset_id,
table_id=table_id,
raw_dataset_id=raw_dataset_id,
Expand All @@ -180,20 +180,26 @@
mode=MODE,
delay_hours=constants.GPS_SPPO_MATERIALIZE_DELAY_HOURS.value,
)

RUN_CLEAN_FALSE = task(
lambda: [None],
checkpoint=False,
name="assign_none_to_previous_runs",
)()
with case(rematerialization, True):
date_range = {
date_range_true = {
"date_range_start": date_range_start,
"date_range_end": date_range_end,
}
rematerialization_dates_true = clean_br_rj_riodejaneiro_onibus_gps(date_range)
RUN_CLEAN_TRUE = clean_br_rj_riodejaneiro_onibus_gps(date_range_true)

rematerialization_dates = merge(
rematerialization_dates_true, rematerialization_dates_false
)
RUN_CLEAN = merge(RUN_CLEAN_TRUE, RUN_CLEAN_FALSE)

date_range = merge(date_range_true, date_range_false)

dataset_sha = fetch_dataset_sha(
dataset_id=dataset_id,
upstream_tasks=[rematerialization_dates],
upstream_tasks=[RUN_CLEAN],
)

# Run materialization #
Expand Down Expand Up @@ -238,7 +244,7 @@

SET = merge(SET_TRUE, SET_FALSE)

materialize_sppo.set_reference_tasks([RUN, rematerialization_dates, SET])
materialize_sppo.set_reference_tasks([RUN, RUN_CLEAN, SET])

materialize_sppo.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
materialize_sppo.run_config = KubernetesRun(
Expand Down

0 comments on commit ac288c9

Please sign in to comment.