Skip to content

Commit

Permalink
Merge branch 'master' into staging/dump_alertario
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Oct 27, 2023
2 parents 80269d4 + 0fe7d64 commit 907a9b9
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 32 deletions.
4 changes: 2 additions & 2 deletions pipelines/rj_smtr/br_rj_riodejaneiro_brt_gps/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
) as materialize_brt:
# Rename flow run
rename_flow_run = rename_current_flow_run_now_time(
prefix="GPS BRT - Materialização: ", now_time=get_now_time()
prefix=materialize_brt.name + ": ", now_time=get_now_time()
)

# Get default parameters #
Expand Down Expand Up @@ -143,7 +143,7 @@

# Rename flow run
rename_flow_run = rename_current_flow_run_now_time(
prefix="SMTR: GPS BRT - Captura - ", now_time=timestamp
prefix=captura_brt.name + ": ", now_time=timestamp
)

# SETUP LOCAL #
Expand Down
4 changes: 2 additions & 2 deletions pipelines/rj_smtr/br_rj_riodejaneiro_gtfs/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
# SETUP dos Flows

gtfs_captura = deepcopy(default_capture_flow)
gtfs_captura.name = "SMTR - Captura dos dados do GTFS"
gtfs_captura.name = "SMTR: GTFS - Captura (subflow)"
gtfs_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
gtfs_captura.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
Expand All @@ -48,7 +48,7 @@
)

gtfs_materializacao = deepcopy(default_materialization_flow)
gtfs_materializacao.name = "SMTR - Materialização dos dados do GTFS"
gtfs_materializacao.name = "SMTR: GTFS - Materialização (subflow)"
gtfs_materializacao.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
gtfs_materializacao.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
Expand Down
14 changes: 7 additions & 7 deletions pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
# Flows #

with Flow(
"SMTR: GPS SPPO - Realocação (captura)",
"SMTR: GPS SPPO Realocação - Captura",
code_owners=["caio", "fernanda", "boris", "rodrigo"],
) as realocacao_sppo:
# SETUP #
Expand All @@ -81,7 +81,7 @@
timestamp = get_current_timestamp()

rename_flow_run = rename_current_flow_run_now_time(
prefix="GPS SPPO - Realocação: ", now_time=timestamp
prefix=realocacao_sppo.name + ": ", now_time=timestamp
)

partitions = create_date_hour_partition(timestamp)
Expand Down Expand Up @@ -135,12 +135,12 @@


with Flow(
"SMTR: GPS SPPO - Materialização",
"SMTR: GPS SPPO - Materialização (subflow)",
code_owners=["caio", "fernanda", "boris", "rodrigo"],
) as materialize_sppo:
# Rename flow run
rename_flow_run = rename_current_flow_run_now_time(
prefix="SMTR: GPS SPPO - Materialização - ", now_time=get_now_time()
prefix=materialize_sppo.name + ": ", now_time=get_now_time()
)

# Get default parameters #
Expand Down Expand Up @@ -228,7 +228,7 @@
timestamp = get_current_timestamp()

rename_flow_run = rename_current_flow_run_now_time(
prefix="GPS SPPO: ", now_time=timestamp
prefix=captura_sppo_v2.name + ": ", now_time=timestamp
)

partitions = create_date_hour_partition(timestamp)
Expand Down Expand Up @@ -282,7 +282,7 @@


with Flow(
"SMTR - GPS SPPO Recapturas", code_owners=["caio", "fernanda", "boris", "rodrigo"]
"SMTR: GPS SPPO - Tratamento", code_owners=["caio", "fernanda", "boris", "rodrigo"]
) as recaptura:
version = Parameter("version", default=2)
datetime_filter = Parameter("datetime_filter", default=None)
Expand All @@ -297,7 +297,7 @@
)

rename_flow_run = rename_current_flow_run_now_time(
prefix="GPS SPPO Recapturas: ", now_time=get_now_time(), wait=timestamps
prefix=recaptura.name + ": ", now_time=get_now_time(), wait=timestamps
)
with case(errors, False):
with case(materialize, True):
Expand Down
10 changes: 6 additions & 4 deletions pipelines/rj_smtr/br_rj_riodejaneiro_rdo/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
)
from pipelines.utils.execute_dbt_model.tasks import run_dbt_model

with Flow("SMTR: SPPO RHO - Materialização") as sppo_rho_materialize:
with Flow(
"SMTR: SPPO RHO - Materialização", code_owners=["rodrigo"]
) as sppo_rho_materialize:
# Rename flow run
rename_flow_run = rename_current_flow_run_now_time(
prefix="SPPO RHO - Materialização: ", now_time=get_now_time()
prefix=sppo_rho_materialize.name + ": ", now_time=get_now_time()
)

# Get default parameters #
Expand Down Expand Up @@ -108,7 +110,7 @@
materialize = Parameter("materialize", False)

rename_run = rename_current_flow_run_now_time(
prefix=f"Captura FTP - {transport_mode.run()}-{report_type.run()} ",
prefix=f"{captura_sppo_rho.name} FTP - {transport_mode.run()}-{report_type.run()} ",
now_time=get_current_timestamp(),
wait=None,
)
Expand Down Expand Up @@ -156,7 +158,7 @@
materialize = Parameter("materialize", False)

rename_run = rename_current_flow_run_now_time(
prefix=f"Captura FTP - {transport_mode.run()}-{report_type.run()} ",
prefix=f"{captura_sppo_rdo.name} FTP - {transport_mode.run()}-{report_type.run()} ",
now_time=get_current_timestamp(),
wait=None,
)
Expand Down
2 changes: 1 addition & 1 deletion pipelines/rj_smtr/br_rj_riodejaneiro_stpl_gps/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
timestamp = get_current_timestamp()

rename_flow_run = rename_current_flow_run_now_time(
prefix="SMTR: GPS STPL - Captura - ", now_time=timestamp
prefix=captura_stpl.name + " - ", now_time=timestamp
)

partitions = create_date_hour_partition(timestamp)
Expand Down
9 changes: 4 additions & 5 deletions pipelines/rj_smtr/projeto_subsidio_sppo/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
# Flows #

with Flow(
"SMTR: Viagens SPPO",
"SMTR: Viagens SPPO - Materialização",
code_owners=["caio", "fernanda", "boris", "rodrigo"],
) as viagens_sppo:
# Rename flow run
Expand All @@ -65,7 +65,7 @@
run_dates = get_run_dates(date_range_start, date_range_end)

rename_flow_run = rename_current_flow_run_now_time(
prefix="SMTR - Viagens SPPO: ", now_time=run_dates
prefix=viagens_sppo.name + ": ", now_time=run_dates
)

LABELS = get_current_flow_labels()
Expand Down Expand Up @@ -98,9 +98,8 @@

viagens_sppo.schedule = every_day_hour_five

SUBSIDIO_SPPO_APURACAO_NAME = "SMTR: Subsídio SPPO Apuração"
with Flow(
SUBSIDIO_SPPO_APURACAO_NAME,
"SMTR: Subsídio SPPO Apuração - Materialização",
code_owners=["rodrigo"],
) as subsidio_sppo_apuracao:
# 1. SETUP #
Expand Down Expand Up @@ -137,7 +136,7 @@

# Rename flow run #
rename_flow_run = rename_current_flow_run_now_time(
prefix=SUBSIDIO_SPPO_APURACAO_NAME + ": ", now_time=run_dates
prefix=subsidio_sppo_apuracao.name + ": ", now_time=run_dates
)

# Set dbt client #
Expand Down
4 changes: 2 additions & 2 deletions pipelines/rj_smtr/registros_ocr_rir/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from pipelines.utils.tasks import rename_current_flow_run_now_time

with Flow(
"SMTR - Captura FTP - OCR RIR",
"SMTR: OCR RIR - Captura",
code_owners=[
"caio",
"fernanda",
Expand All @@ -33,7 +33,7 @@
dump = Parameter("dump", default=False)
execution_time = Parameter("execution_time", default=None)
rename_flow = rename_current_flow_run_now_time(
prefix="OCR RIR: ", now_time=get_current_timestamp()
prefix=captura_ocr.name + " ", now_time=get_current_timestamp()
)
# Pipeline
status = get_files_from_ftp(
Expand Down
15 changes: 6 additions & 9 deletions pipelines/rj_smtr/veiculo/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@
# Flows #

# flake8: noqa: E501
sppo_licenciamento_captura_name = f"SMTR: Captura - {constants.DATASET_ID.value}.{constants.SPPO_LICENCIAMENTO_TABLE_ID.value}"
with Flow(
sppo_licenciamento_captura_name,
f"SMTR: {constants.DATASET_ID.value} {constants.SPPO_LICENCIAMENTO_TABLE_ID.value} - Captura",
code_owners=["caio", "fernanda", "boris", "rodrigo"],
) as sppo_licenciamento_captura:
timestamp = get_current_timestamp()
Expand All @@ -67,7 +66,7 @@

# Rename flow run
rename_flow_run = rename_current_flow_run_now_time(
prefix=f"{sppo_licenciamento_captura_name} - ", now_time=timestamp
prefix=f"{sppo_licenciamento_captura.name} - ", now_time=timestamp
)

# SETUP #
Expand Down Expand Up @@ -124,9 +123,8 @@
)
sppo_licenciamento_captura.schedule = every_day_hour_seven

sppo_infracao_captura_name = f"SMTR: Captura - {constants.DATASET_ID.value}.{constants.SPPO_INFRACAO_TABLE_ID.value}"
with Flow(
sppo_infracao_captura_name,
f"SMTR: {constants.DATASET_ID.value} {constants.SPPO_INFRACAO_TABLE_ID.value} - Captura",
code_owners=["caio", "fernanda", "boris", "rodrigo"],
) as sppo_infracao_captura:
timestamp = get_current_timestamp()
Expand All @@ -136,7 +134,7 @@

# Rename flow run
rename_flow_run = rename_current_flow_run_now_time(
prefix=f"{sppo_infracao_captura_name} - ", now_time=timestamp
prefix=f"{sppo_infracao_captura.name} - ", now_time=timestamp
)

# SETUP #
Expand Down Expand Up @@ -192,9 +190,8 @@
sppo_infracao_captura.schedule = every_day_hour_seven

# flake8: noqa: E501
sppo_veiculo_dia_name = f"SMTR: Materialização - {constants.DATASET_ID.value}.{constants.SPPO_VEICULO_DIA_TABLE_ID.value}"
with Flow(
sppo_veiculo_dia_name,
f"SMTR: {constants.DATASET_ID.value} {constants.SPPO_VEICULO_DIA_TABLE_ID.value} - Materialização (subflow)",
code_owners=["caio", "fernanda", "boris", "rodrigo"],
) as sppo_veiculo_dia:
# 1. SETUP #
Expand All @@ -208,7 +205,7 @@

# Rename flow run #
rename_flow_run = rename_current_flow_run_now_time(
prefix=sppo_veiculo_dia_name + ": ",
prefix=sppo_veiculo_dia.name + ": ",
now_time=run_dates,
)

Expand Down

0 comments on commit 907a9b9

Please sign in to comment.