Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][gps_sppo] Cria flow de recaptura de realocações #479

Closed
wants to merge 52 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
e70db4e
initial commit
eng-rodrigocunha Aug 4, 2023
fedf057
fix docs for get_realocacao_recapture_timestamps
eng-rodrigocunha Aug 4, 2023
8bd0fe2
add task in rj_smtr
eng-rodrigocunha Aug 4, 2023
73c6c1d
refactor timestamp param treating in realocacao
eng-rodrigocunha Aug 4, 2023
9605a49
change task importing package
eng-rodrigocunha Aug 4, 2023
bc532ea
Remove task from projeto_subsidio_sppo
eng-rodrigocunha Aug 4, 2023
f2c97d9
update params in realocacao_sppo_recaptura
eng-rodrigocunha Aug 4, 2023
c07497f
fix renaming flow realocacao_sppo_recaptura
eng-rodrigocunha Aug 4, 2023
69116e5
update task get_realocacao_recapture_timestamps
eng-rodrigocunha Aug 4, 2023
bd0e644
update realocacao_sppo_recaptura
eng-rodrigocunha Aug 4, 2023
4ed6ffd
update logs from recapture_timestamps task
eng-rodrigocunha Aug 4, 2023
b5d5c9b
update query_logs + get_current_timestamp
eng-rodrigocunha Aug 4, 2023
947cf7d
update realocacao_sppo flow
eng-rodrigocunha Aug 4, 2023
027cfe3
update get_realocacao_recapture_timestamps task
eng-rodrigocunha Aug 4, 2023
9977408
update get_current_timestamp
eng-rodrigocunha Aug 4, 2023
3137f94
fix get_realocacao_recapture_timestamps
eng-rodrigocunha Aug 4, 2023
9cf4211
fix get_realocacao_recapture_timestamps
eng-rodrigocunha Aug 4, 2023
2f6f001
fix get_realocacao_recapture_timestamps
eng-rodrigocunha Aug 4, 2023
5eaf4a5
fix get_realocacao_recapture_timestamps
eng-rodrigocunha Aug 4, 2023
afc3c0c
fix get_realocacao_recapture_timestamps
eng-rodrigocunha Aug 4, 2023
60d800a
fix query_logs
eng-rodrigocunha Aug 4, 2023
ee9552a
Change agent from realocacao_sppo + recaptura
eng-rodrigocunha Aug 4, 2023
c1ee144
change agent subsidio_sppo_apuracao for testing
eng-rodrigocunha Aug 4, 2023
b2b6986
fix query_logs
eng-rodrigocunha Aug 4, 2023
4755d0d
revert name + agent sppo_recaptura + realocacao
eng-rodrigocunha Aug 4, 2023
6eb9097
revert agent subsidio_sppo_apuracao
eng-rodrigocunha Aug 4, 2023
b251da9
fix datetime filter > current timestamp
eng-rodrigocunha Aug 5, 2023
828592c
fix get_realocacao_recapture_timestamps round
eng-rodrigocunha Aug 5, 2023
9d336e1
update log in get_realocacao_recapture_timestamps
eng-rodrigocunha Aug 5, 2023
c1dbd7a
add get_project_name function
eng-rodrigocunha Aug 5, 2023
b386126
update get_project_name
eng-rodrigocunha Aug 5, 2023
a25b8ef
add get_project_name to realocacao_sppo_recaptura
eng-rodrigocunha Aug 5, 2023
c894df9
Merge branch 'master' into staging/recaptura-realocacao
mergify[bot] Aug 10, 2023
783fc3e
Merge branch 'master' into staging/recaptura-realocacao
mergify[bot] Aug 14, 2023
3aa4604
Merge branch 'master' into staging/recaptura-realocacao
mergify[bot] Aug 16, 2023
e669f9b
Merge branch 'master' into staging/recaptura-realocacao
mergify[bot] Aug 16, 2023
c9f0430
Merge branch 'master' into staging/recaptura-realocacao
mergify[bot] Aug 16, 2023
a5ec01e
Merge branch 'master' into staging/recaptura-realocacao
mergify[bot] Aug 18, 2023
60b3268
Merge branch 'master' into staging/recaptura-realocacao
eng-rodrigocunha Sep 5, 2023
1a5f3c6
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 5, 2023
4f764ce
change get_recapture_timestamps task
eng-rodrigocunha Sep 5, 2023
2d1cafb
add function query_logs_func in utils
eng-rodrigocunha Sep 5, 2023
883a13f
update realocacao_sppo_recaptura flow params
eng-rodrigocunha Sep 5, 2023
636255f
Merge branch 'master' into staging/recaptura-realocacao
eng-rodrigocunha Oct 10, 2023
67dffe9
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 10, 2023
4be9355
Merge branch 'master' into staging/recaptura-realocacao
mergify[bot] Oct 10, 2023
d535a88
adiciona arredondamento de filtro de datahora
eng-rodrigocunha Oct 10, 2023
e178231
DRY query_logs_func
eng-rodrigocunha Oct 10, 2023
2dd0158
atualiza filtro data
eng-rodrigocunha Oct 11, 2023
b9e5a90
corrige conversão datahora
eng-rodrigocunha Oct 11, 2023
2cbd647
atualiza code_owners
eng-rodrigocunha Oct 11, 2023
ff669bb
localiza filtro datahora
eng-rodrigocunha Oct 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""

from prefect import Parameter, case
from prefect.tasks.control_flow import merge
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
Expand Down Expand Up @@ -41,7 +42,10 @@
set_last_run_timestamp,
upload_logs_to_bq,
bq_upload,
check_param,
get_recapture_timestamps,
)

from pipelines.rj_smtr.br_rj_riodejaneiro_onibus_gps.tasks import (
pre_treatment_br_rj_riodejaneiro_onibus_gps,
create_api_url_onibus_gps,
Expand All @@ -54,8 +58,11 @@
every_minute,
every_10_minutes,
)

from pipelines.utils.execute_dbt_model.tasks import run_dbt_model

from pipelines.rj_smtr.utils import get_project_name

# Flows #

with Flow(
Expand All @@ -76,9 +83,20 @@
"table_id", default=constants.GPS_SPPO_REALOCACAO_TREATED_TABLE_ID.value
)
rebuild = Parameter("rebuild", False)
timestamp_param = Parameter("timestamp", None)
recapture = Parameter("recapture", False)
previous_error = Parameter("previous_error", None)

# SETUP
timestamp = get_current_timestamp()
timestamp_cond = check_param(timestamp_param)

with case(timestamp_cond, True):
timestamp_get = get_current_timestamp()

with case(timestamp_cond, False):
timestamp_def = get_current_timestamp(timestamp_param)

timestamp = merge(timestamp_get, timestamp_def)

rename_flow_run = rename_current_flow_run_now_time(
prefix="GPS SPPO - Realocação: ", now_time=timestamp
Expand Down Expand Up @@ -124,6 +142,8 @@
parent_table_id=constants.GPS_SPPO_REALOCACAO_RAW_TABLE_ID.value,
error=error,
timestamp=timestamp,
previous_error=previous_error,
recapture=recapture,
)

realocacao_sppo.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
Expand All @@ -133,6 +153,48 @@
)
realocacao_sppo.schedule = every_10_minutes

REALOCACAO_SPPO_RECAPTURA_NAME = "SMTR: GPS SPPO - Realocação (recaptura)"
with Flow(
REALOCACAO_SPPO_RECAPTURA_NAME,
code_owners=["rodrigo"], # "caio", "fernanda", "boris"],
) as realocacao_sppo_recaptura:
start_date = Parameter("start_date", default="")
end_date = Parameter("end_date", default="")

LABELS = get_current_flow_labels()
MODE = get_current_flow_mode(LABELS)
PROJECT_NAME = get_project_name(MODE)

current_timestamp = get_current_timestamp()

timestamps = get_recapture_timestamps(
current_timestamp=current_timestamp,
start_date=start_date,
end_date=end_date,
dataset_id=constants.GPS_SPPO_RAW_DATASET_ID.value,
table_id=constants.GPS_SPPO_REALOCACAO_RAW_TABLE_ID.value,
)

GPS_SPPO_REALOCACAO_RUN = create_flow_run.map(
flow_name=unmapped(realocacao_sppo.name),
project_name=unmapped(PROJECT_NAME),
run_name=unmapped(realocacao_sppo.name),
parameters=timestamps,
)

wait_for_flow_run.map(
GPS_SPPO_REALOCACAO_RUN,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
)

realocacao_sppo_recaptura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
realocacao_sppo_recaptura.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)


with Flow(
"SMTR: GPS SPPO - Materialização",
Expand Down
3 changes: 1 addition & 2 deletions pipelines/rj_smtr/projeto_subsidio_sppo/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
get_run_dates,
get_join_dict,
get_previous_date,
check_param,
# get_local_dbt_client,
# set_last_run_timestamp,
)
Expand All @@ -47,8 +48,6 @@
from pipelines.rj_smtr.schedules import every_day_hour_five, every_day_hour_seven
from pipelines.utils.execute_dbt_model.tasks import run_dbt_model

from pipelines.rj_smtr.projeto_subsidio_sppo.tasks import check_param

# Flows #

with Flow(
Expand Down
10 changes: 0 additions & 10 deletions pipelines/rj_smtr/projeto_subsidio_sppo/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,3 @@
"""
Tasks for projeto_subsidio_sppo
"""

from prefect import task


@task
def check_param(param: str) -> bool:
"""
Check if param is None
"""
return param is None
Loading