Skip to content

Commit

Permalink
Otimiza e inclui parâmetros de rematerialização no flow `materialize_…
Browse files Browse the repository at this point in the history
…sppo` (#673)

* commit inicial

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* corrige clean_br_rj_riodejaneiro_onibus_gps

* corrige log clean_br_rj_riodejaneiro_onibus_gps

* atualiza changelog

* enriquece log

* Corrige queries

* atualiza tasks de referência

* Altera agentes para produção

* altera nomes de variaveis

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Atualiza docstring

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Rafael Carvalho Pinheiro <[email protected]>
  • Loading branch information
3 people authored Apr 26, 2024
1 parent fe6b2d5 commit 83756bf
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 20 deletions.
12 changes: 11 additions & 1 deletion pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Changelog - br_rj_riodejaneiro_onibus_gps

## [1.0.1] - 2024-04-26

### Adicionado

- Cria task `clean_br_rj_riodejaneiro_onibus_gps` (https://github.com/prefeitura-rio/pipelines/pull/673)

### Alterado

- Otimiza e inclui parâmetros de rematerialização no flow `materialize_sppo` (https://github.com/prefeitura-rio/pipelines/pull/673)

## [1.0.0] - 2024-04-26

### Adicionado
Expand All @@ -12,4 +22,4 @@

### Corrigido

- Corrigido parâmetro `timestamp` do flow `realocacao_sppo` (https://github.com/prefeitura-rio/pipelines/pull/668)
- Corrigido parâmetro `timestamp` do flow `realocacao_sppo` (https://github.com/prefeitura-rio/pipelines/pull/668)
65 changes: 46 additions & 19 deletions pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
Flows for br_rj_riodejaneiro_onibus_gps
"""

from prefect import Parameter, case
from prefect import Parameter, case, task
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.utilities.edges import unmapped
from prefect.tasks.control_flow import merge


# EMD Imports #
Expand Down Expand Up @@ -48,6 +49,7 @@
create_api_url_onibus_gps,
create_api_url_onibus_realocacao,
pre_treatment_br_rj_riodejaneiro_onibus_realocacao,
clean_br_rj_riodejaneiro_onibus_gps,
)

from pipelines.rj_smtr.schedules import (
Expand Down Expand Up @@ -155,6 +157,9 @@
dataset_id = Parameter("dataset_id", default=constants.GPS_SPPO_DATASET_ID.value)
table_id = Parameter("table_id", default=constants.GPS_SPPO_TREATED_TABLE_ID.value)
rebuild = Parameter("rebuild", False)
rematerialization = Parameter("rematerialization", default=False)
date_range_start = Parameter("date_range_start", default=None)
date_range_end = Parameter("date_range_end", default=None)

LABELS = get_current_flow_labels()
MODE = get_current_flow_mode(LABELS)
Expand All @@ -165,22 +170,35 @@
# dbt_client = get_local_dbt_client(host="localhost", port=3001)

# Set specific run parameters #
date_range = get_materialization_date_range(
dataset_id=dataset_id,
table_id=table_id,
raw_dataset_id=raw_dataset_id,
raw_table_id=raw_table_id,
table_run_datetime_column_name="timestamp_gps",
mode=MODE,
delay_hours=constants.GPS_SPPO_MATERIALIZE_DELAY_HOURS.value,
with case(rematerialization, False):
rematerialization_dates_false = date_range = get_materialization_date_range(
dataset_id=dataset_id,
table_id=table_id,
raw_dataset_id=raw_dataset_id,
raw_table_id=raw_table_id,
table_run_datetime_column_name="timestamp_gps",
mode=MODE,
delay_hours=constants.GPS_SPPO_MATERIALIZE_DELAY_HOURS.value,
)
with case(rematerialization, True):
date_range = {
"date_range_start": date_range_start,
"date_range_end": date_range_end,
}
rematerialization_dates_true = clean_br_rj_riodejaneiro_onibus_gps(date_range)

rematerialization_dates = merge(
rematerialization_dates_true, rematerialization_dates_false
)

dataset_sha = fetch_dataset_sha(
dataset_id=dataset_id,
upstream_tasks=[rematerialization_dates],
)

# Run materialization #
with case(rebuild, True):
RUN = run_dbt_model(
RUN_TRUE = run_dbt_model(
dbt_client=dbt_client,
dataset_id=dataset_id,
table_id=table_id,
Expand All @@ -189,30 +207,39 @@
_vars=[date_range, dataset_sha],
flags="--full-refresh",
)
set_last_run_timestamp(
dataset_id=dataset_id,
table_id=table_id,
timestamp=date_range["date_range_end"],
wait=RUN,
mode=MODE,
)

with case(rebuild, False):
RUN = run_dbt_model(
RUN_FALSE = run_dbt_model(
dbt_client=dbt_client,
dataset_id=dataset_id,
table_id=table_id,
exclude="+data_versao_efetiva",
_vars=[date_range, dataset_sha],
upstream=True,
)
set_last_run_timestamp(

RUN = merge(RUN_TRUE, RUN_FALSE)

with case(rematerialization, False):
SET_FALSE = set_last_run_timestamp(
dataset_id=dataset_id,
table_id=table_id,
timestamp=date_range["date_range_end"],
wait=RUN,
mode=MODE,
)

with case(rematerialization, True):
SET_TRUE = task(
lambda: [None],
checkpoint=False,
name="assign_none_to_previous_runs",
)()

SET = merge(SET_TRUE, SET_FALSE)

materialize_sppo.set_reference_tasks([RUN, rematerialization_dates, SET])

materialize_sppo.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
materialize_sppo.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
Expand Down
70 changes: 70 additions & 0 deletions pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import pandas as pd
from prefect import task
import pendulum
import basedosdados as bd
from typing import Union

# EMD Imports #

Expand Down Expand Up @@ -254,3 +256,71 @@ def pre_treatment_br_rj_riodejaneiro_onibus_gps(
log(f"[CATCHED] Task failed with error: \n{error}", level="error")

return {"data": df_gps, "error": error}


@task
def clean_br_rj_riodejaneiro_onibus_gps(date_range: dict) -> Union[str, None]:
"""
Clean GPS data for a given date range.
This function deletes records from three different tables in the database:
- `rj-smtr.br_rj_riodejaneiro_onibus_gps.sppo_aux_registros_filtrada`
- `rj-smtr.br_rj_riodejaneiro_onibus_gps.sppo_aux_registros_realocacao`
- `rj-smtr.br_rj_riodejaneiro_veiculos.gps_sppo`
The records to be deleted are determined by the provided
date range and the timestamp_gps column.
Parameters:
- date_range (dict): A dictionary containing the start
and end dates for the data to be cleaned.
Returns:
- str or None: If an error occurs during the cleaning process,
the error message is returned. Otherwise, None is returned.
"""
error = None

try:
q = f"""
DELETE
FROM
`rj-smtr.br_rj_riodejaneiro_onibus_gps.sppo_aux_registros_filtrada`
WHERE
(data BETWEEN DATE("{date_range['date_range_start']}")
AND DATE("{date_range['date_range_end']}"))
AND (timestamp_gps > "{date_range['date_range_start']}"
AND timestamp_gps <= "{date_range['date_range_end']}");
DELETE
FROM
`rj-smtr.br_rj_riodejaneiro_onibus_gps.sppo_aux_registros_realocacao`
WHERE
(data BETWEEN DATE("{date_range['date_range_start']}")
AND DATE("{date_range['date_range_end']}"))
AND (timestamp_gps > "{date_range['date_range_start']}"
AND timestamp_gps <= "{date_range['date_range_end']}");
DELETE
FROM
`rj-smtr.br_rj_riodejaneiro_veiculos.gps_sppo`
WHERE
(data BETWEEN DATE("{date_range['date_range_start']}")
AND DATE("{date_range['date_range_end']}"))
AND (timestamp_gps > "{date_range['date_range_start']}"
AND timestamp_gps <= "{date_range['date_range_end']}");
"""
log(q)

results = bd.read_sql(q)

log(
f"""Cleaned GPS data for
{date_range['date_range_start']} to {date_range['date_range_end']}\n
Resulting:\n
{results}"""
)
except Exception: # pylint: disable = W0703
error = traceback.format_exc()
log(f"[CATCHED] Task failed with error: \n{error}", level="error")

return error

0 comments on commit 83756bf

Please sign in to comment.