Skip to content

Commit

Permalink
Apuração por faixa horária subsídio (#114)
Browse files Browse the repository at this point in the history
* Adiciona subsidio_km_teto na viagens_remuneradas

* cria modelo subsidio_faixa_servico_dia

* cria modelo subsidio_faixa_servico_dia_tipo_viagem

* cria modelo subsidio_penalidade_servico_dia

* cria modelo subsidio_sumario_servico_dia_pagamento

* cria modelo sumario_faixa_servico_dia

* cria modelo sumario_servico_dia_pagamento

* Altera schemas e dbt_project

* Corrige schema dashboard_subsidio_sppo_v2

* Corrige schema financeiro

* Fix config subsidio_faixa_servico_dia

* teste flow

* target dev

* subsidio_km_teto hard coded

* corrige flow para teste

* ignora dashboard_subsidio_sppo para teste

* Altera tipo viagem sem transacao

* altera faixa horaria para teste

* adiciona selectors.yml

* teste selector

* cria task run_dbt_selector

* add set_upstream

* altera import run_dbt_selector

* cria modelo ordem_servico_faixa_horaria

* cria variavel DATA_SUBSIDIO_V9_INICIO

* altera modelos que dependem do ordem_servico_faixa_horaria

* alterações para teste

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* alterações para teste

* alterações para teste

* correção de digitação

* alterações para teste

* altera data dbt projeto_subsidio_sppo

* corrige variavel no modelo

* correção de digitação

* adiciona distinct no select

* correção de digitação

* altera logica dia_seguinte

* correção de digitação

* corrige joins viagem_planejada

* corrige lógica dia_seguinte

* altera selector

* altera refs

* altera critérios de viagens remuneradas

* altera ref subsidio_parametros

* remove ambiguidade

* altera join viagens_remuneradas

* altera refs

* teste dia seguinte

* insere dia_anterior na viagem_planejada

* altera lógica viagem_planejada

* adiciona agrupamento para remover viagens planejadas duplicadas pela feature da faixa horária

* remove datetime_partida do select final e comentários

* altera refs

* altera join para filtrar viagens na faixa planejada

* teste lógica em valores_calculados

* adiciona DATA_SUBSIDIO_V9_INICIO

* adicionado DATA_SUBSIDIO_V3A_INICIO

* altera para prod

* cria dataset planejamento e modelo ordem_servico_faixa_horaria

* remove hardcode e altera case

* adiciona filtro feed_version e tipo_os na CTE dia_anterior

* altera nome da coluna km_apurada_sem_passageiro para km_apurada_sem_transacao

* corrige lógica viagens_planejadas

* corrige select adicionando as colunas inicio_periodo e fim_periodo

* adiciona colunas partidas, faixa_horaria_inicio e faixa_horaria_fim

* remove coluna valor_judicial de subsidio_faixa_servico_dia_tipo_viagem

* adiciona coluna indicador_penalidade_judicial

* adiciona coluna indicador_penalidade_judicial, altera viagens_planejadas e viagens_planejadas_ida_volta

* Altera ref e select final

* adiciona coluna indicador_penalidade_judicial

* altera nome da coluna partidas para partidas_total_planejada

* remove valor_judicial

* adiciona trunc nas colunas e indicador_viagem_remunerada no select final

* altera lógica das colunas com valores

* altera nome dos selectors

* altera nome de indicador_viagem_remunerada para indicador_viagem_dentro_limite

* altera sinal para valor_acima_limite

* adiciona as colunas versao e datetime_ultima_atualizacao

* altera teste_sumario_servico_dia_tipo_soma_km

* cria tasks check_date_in_range e split_date_range

* altera lógica da apuração caso houver datas da apuracao_subsidio_v8 e apuracao_subsidio_v9

* altera teste_sumario_servico_dia_tipo_soma_km e cria constantes SUBSIDIO_SPPO_DASHBOARD_SUMARIO_TABLE_ID e SUBSIDIO_SPPO_DASHBOARD_SUMARIO_TABLE_ID_V2

* corrige check_date_in_range e split_date_range, e altera subsidio_data_quality_check

* corrige lógica subsidio_sppo_apuracao

* altera lógica para rodar sem faixa horária

* altera lógica conforme DATA_SUBSIDIO_V9_INICIO

* add changelogs

* cria financeiro_staging e move modelo subsidio_faixa_servico_dia

* adiciona materialized e incremental_strategy do dataset planejamento

* cria tasks get_posterior_date, check_date_in_range e split_date_range

* adiciona parametro run_d0 e ajusta chamada das tasks

* adiciona GreaterThanOrEqual e remove check_start_date, check_date_in_range e split_date_range

* altera lógica conforme DATA_SUBSIDIO_V9_INICIO

* altera modelo para pegar os dados de staging

* adiciona feed_start_date no join

* materializa 1 dia

* altera changelog

* materializa tabela

* add changelog

* ajusta DATA_SUBSIDIO_V9_INICIO

* altera refs e remove comentários

* atualiza descrição da task get_posterior_date

* remove prefixo apuracaoFH

* altera partidas por partidas_total_planejada

* remove comentários e altera refs

* adiciona tipo_os no join

* adiciona coluna tipo_os

* altera partidas por partidas_total_planejada

* materializa d-1

* altera refs e filtro da CTE shapes

* altera subquery com jinja

* altera tipagem das tasks get_posterior_date e split_date_range

* altera CTE servico_faixa_km_apuracao para funcionar antes e depois da vigencia da faixa horaria

* corrige comentários das refs

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Rodrigo Cunha <[email protected]>
Co-authored-by: Rodrigo Cunha <[email protected]>
  • Loading branch information
5 people authored Sep 10, 2024
1 parent f55e2b1 commit c5ca334
Show file tree
Hide file tree
Showing 44 changed files with 1,828 additions and 113 deletions.
13 changes: 13 additions & 0 deletions pipelines/migration/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Changelog - migration

## [1.0.1] - 2024-08-29

### Adicionado

- Adicionadas as tasks `get_posterior_date`, `check_date_in_range` e `split_date_range` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114)

## [1.0.0] - 2024-08-29

### Adicionado

- Adiciona task `run_dbt_selector` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114)
16 changes: 16 additions & 0 deletions pipelines/migration/projeto_subsidio_sppo/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
# Changelog - projeto_subsidio_sppo

## [1.0.5] - 2024-08-29

### Alterado

- Alterado `teste_sumario_servico_dia_tipo_soma_km` para considerar tabela de acordo com o período (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114)

- Alterado `indicador_viagem_remunerada` para `indicador_viagem_dentro_limite` no `SUBSIDIO_SPPO_DATA_CHECKS_POS_LIST` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114)

- Alterada a lógica do flow `subsidio_sppo_apuracao` para utilizar os selectors `apuracao_subsidio_v8` e `apuracao_subsidio_v9` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114)

- Alterada a lógica da task `subsidio_data_quality_check` para considerar tabela de acordo com o período (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114)

### Adicionado

- Adiciona parâmetro run_d0 para materializar D+0 as tabelas `viagem_planejada` e `subsidio_data_versao_efetiva` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114)

## [1.0.4] - 2024-08-19

### Adicionado
Expand Down
13 changes: 10 additions & 3 deletions pipelines/migration/projeto_subsidio_sppo/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,25 @@ class constants(Enum): # pylint: disable=c0103
Constant values for rj_smtr projeto_subsidio_sppo
"""

SUBSIDIO_SPPO_FINANCEIRO_DATASET_ID = "financeiro"

SUBSIDIO_SPPO_DATASET_ID = "projeto_subsidio_sppo"
SUBSIDIO_SPPO_SECRET_PATH = "projeto_subsidio_sppo"
SUBSIDIO_SPPO_TABLE_ID = "viagem_completa"
SUBSIDIO_SPPO_CODE_OWNERS = ["dados_smtr"]

SUBSIDIO_SPPO_V2_DATASET_ID = "subsidio"
# Feature Apuração por faixa horária
DATA_SUBSIDIO_V9_INICIO = "2024-08-16"

# SUBSÍDIO DASHBOARD
# flake8: noqa: E501
SUBSIDIO_SPPO_DASHBOARD_DATASET_ID = "dashboard_subsidio_sppo"
SUBSIDIO_SPPO_DASHBOARD_V2_DATASET_ID = "dashboard_subsidio_sppo_v2"
SUBSIDIO_SPPO_DASHBOARD_STAGING_DATASET_ID = "dashboard_subsidio_sppo_staging"
SUBSIDIO_SPPO_DASHBOARD_TABLE_ID = "sumario_servico_dia"
SUBSIDIO_SPPO_DASHBOARD_SUMARIO_TABLE_ID = "sumario_servico_dia_tipo"
SUBSIDIO_SPPO_DASHBOARD_SUMARIO_TABLE_ID_V2 = "sumario_servico_dia_pagamento"
SUBSIDIO_SPPO_DATA_CHECKS_PARAMS = {
"check_trips_processing": {
"query": """SELECT
Expand Down Expand Up @@ -415,7 +422,7 @@ class constants(Enum): # pylint: disable=c0103
km_apurada,
ROUND(COALESCE(km_apurada_registrado_com_ar_inoperante,0) + COALESCE(km_apurada_n_licenciado,0) + COALESCE(km_apurada_autuado_ar_inoperante,0) + COALESCE(km_apurada_autuado_seguranca,0) + COALESCE(km_apurada_autuado_limpezaequipamento,0) + COALESCE(km_apurada_licenciado_sem_ar_n_autuado,0) + COALESCE(km_apurada_licenciado_com_ar_n_autuado,0) + COALESCE(km_apurada_n_vistoriado, 0) + COALESCE(km_apurada_sem_transacao, 0),2) AS km_apurada2
FROM
`rj-smtr.dashboard_subsidio_sppo.sumario_servico_dia_tipo`
`rj-smtr`.`{dataset_id_v2}`.`{table_id_v2}`
WHERE
DATA BETWEEN DATE("{start_timestamp}")
AND DATE("{end_timestamp}"))
Expand Down Expand Up @@ -685,8 +692,8 @@ class constants(Enum): # pylint: disable=c0103
"expression": "id_viagem IS NOT NULL",
},
"Todas viagens possuem indicador de viagem remunerada não nulo e verdadeiro/falso": {
"expression": "indicador_viagem_remunerada IS NOT NULL\
AND indicador_viagem_remunerada IN (TRUE, FALSE)",
"expression": "indicador_viagem_dentro_limite IS NOT NULL\
AND indicador_viagem_dentro_limite IN (TRUE, FALSE)",
},
"Todas viagens com distância planejada não nula e maior ou igual a zero": {
"expression": "distancia_planejada IS NOT NULL AND distancia_planejada >= 0",
Expand Down
109 changes: 84 additions & 25 deletions pipelines/migration/projeto_subsidio_sppo/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.control_flow import merge
from prefect.tasks.core.operators import GreaterThanOrEqual
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.utilities.edges import unmapped
from prefeitura_rio.pipelines_utils.custom import Flow
Expand All @@ -22,28 +23,32 @@

from pipelines.constants import constants as smtr_constants
from pipelines.migration.projeto_subsidio_sppo.constants import constants

# from pipelines.materialize_to_datario.flows import (
# smtr_materialize_to_datario_viagem_sppo_flow,
# )
from pipelines.migration.projeto_subsidio_sppo.tasks import (
check_param,
subsidio_data_quality_check,
)
from pipelines.migration.tasks import (
check_date_in_range,
fetch_dataset_sha,
get_current_flow_labels,
get_current_flow_mode,
get_flow_project,
get_join_dict,
get_now_date,
get_posterior_date,
get_previous_date,
get_run_dates,
rename_current_flow_run_now_time,
run_dbt_model,
split_date_range,
)
from pipelines.migration.veiculo.flows import sppo_veiculo_dia
from pipelines.schedules import every_day_hour_five, every_day_hour_seven_minute_five
from pipelines.tasks import run_dbt_selector

# from pipelines.materialize_to_datario.flows import (
# smtr_materialize_to_datario_viagem_sppo_flow,
# )

# EMD Imports #

Expand All @@ -64,6 +69,7 @@
# Get default parameters #
date_range_start = Parameter("date_range_start", default=False)
date_range_end = Parameter("date_range_end", default=False)
run_d0 = Parameter("run_d0", default=True)

run_dates = get_run_dates(date_range_start, date_range_end)

Expand Down Expand Up @@ -94,6 +100,14 @@
_vars=_vars,
)

with case(run_d0, True):
date_d0 = get_posterior_date(1)
RUN_2 = run_dbt_model(
dataset_id=constants.SUBSIDIO_SPPO_DATASET_ID.value,
table_id="subsidio_data_versao_efetiva viagem_planejada",
_vars={"run_date": date_d0, "version": dataset_sha},
)

viagens_sppo.storage = GCS(smtr_constants.GCS_FLOWS_BUCKET.value)
viagens_sppo.run_config = KubernetesRun(
image=smtr_constants.DOCKER_IMAGE.value, labels=[smtr_constants.RJ_SMTR_AGENT_LABEL.value]
Expand Down Expand Up @@ -157,7 +171,8 @@
dataset_id=constants.SUBSIDIO_SPPO_DASHBOARD_DATASET_ID.value,
)

_vars = {"start_date": start_date, "end_date": end_date}
dates = [{"start_date": start_date, "end_date": end_date}]
_vars = get_join_dict(dict_list=dates, new_dict=dataset_sha)[0]

# 2. MATERIALIZE DATA #
with case(test_only, False):
Expand Down Expand Up @@ -200,28 +215,72 @@

with case(SUBSIDIO_SPPO_DATA_QUALITY_PRE, True):
# 4. CALCULATE #
SUBSIDIO_SPPO_STAGING_RUN = run_dbt_model(
# dbt_client=dbt_client,
dataset_id=constants.SUBSIDIO_SPPO_DASHBOARD_STAGING_DATASET_ID.value,
_vars=_vars,
upstream_tasks=[SUBSIDIO_SPPO_DATA_QUALITY_PRE],
date_in_range = check_date_in_range(
_vars["start_date"], _vars["end_date"], constants.DATA_SUBSIDIO_V9_INICIO.value
)

SUBSIDIO_SPPO_APURACAO_RUN = run_dbt_model(
# dbt_client=dbt_client,
dataset_id=constants.SUBSIDIO_SPPO_V2_DATASET_ID.value
+ " "
+ constants.SUBSIDIO_SPPO_DASHBOARD_DATASET_ID.value,
_vars=_vars,
upstream_tasks=[SUBSIDIO_SPPO_STAGING_RUN],
)

# 5. POST-DATA QUALITY CHECK #
SUBSIDIO_SPPO_DATA_QUALITY_POS = subsidio_data_quality_check(
mode="pos",
params=_vars,
upstream_tasks=[SUBSIDIO_SPPO_APURACAO_RUN],
)
with case(date_in_range, True):
date_intervals = split_date_range(
_vars["start_date"], _vars["end_date"], constants.DATA_SUBSIDIO_V9_INICIO.value
)

dbt_vars_1 = get_join_dict(
dict_list=[_vars], new_dict=date_intervals["first_range"]
)[0]

SUBSIDIO_SPPO_APURACAO_RUN = run_dbt_selector(
selector_name="apuracao_subsidio_v8",
_vars=dbt_vars_1,
)

# POST-DATA QUALITY CHECK #
SUBSIDIO_SPPO_DATA_QUALITY_POS = subsidio_data_quality_check(
mode="pos",
params=dbt_vars_1,
upstream_tasks=[SUBSIDIO_SPPO_APURACAO_RUN],
)

dbt_vars_2 = get_join_dict(
dict_list=[dbt_vars_1],
new_dict=date_intervals["second_range"],
upstream_tasks=[SUBSIDIO_SPPO_DATA_QUALITY_POS],
)[0]

SUBSIDIO_SPPO_APURACAO_RUN_2 = run_dbt_selector(
selector_name="apuracao_subsidio_v9",
_vars=dbt_vars_2,
upstream_tasks=[dbt_vars_2],
)

# POST-DATA QUALITY CHECK #
SUBSIDIO_SPPO_DATA_QUALITY_POS_2 = subsidio_data_quality_check(
mode="pos",
params=dbt_vars_2,
upstream_tasks=[SUBSIDIO_SPPO_APURACAO_RUN_2],
)

with case(date_in_range, False):
gte = GreaterThanOrEqual()
gte_result = gte.run(_vars["start_date"], constants.DATA_SUBSIDIO_V9_INICIO.value)

with case(gte_result, False):
SUBSIDIO_SPPO_APURACAO_RUN = run_dbt_selector(
selector_name="apuracao_subsidio_v8",
_vars=_vars,
)

with case(gte_result, True):
SUBSIDIO_SPPO_APURACAO_RUN = run_dbt_selector(
selector_name="apuracao_subsidio_v9",
_vars=_vars,
)

# POST-DATA QUALITY CHECK #
SUBSIDIO_SPPO_DATA_QUALITY_POS = subsidio_data_quality_check(
mode="pos",
params=_vars,
upstream_tasks=[SUBSIDIO_SPPO_APURACAO_RUN],
)

# TODO: test upstream_tasks=[SUBSIDIO_SPPO_DASHBOARD_RUN]
# 6. PUBLISH #
Expand Down
15 changes: 14 additions & 1 deletion pipelines/migration/projeto_subsidio_sppo/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from datetime import datetime, timedelta

from prefect import task
from prefect.tasks.core.operators import GreaterThanOrEqual

from pipelines.constants import constants as smtr_constants
from pipelines.migration.projeto_subsidio_sppo.constants import constants
Expand Down Expand Up @@ -60,9 +61,20 @@ def subsidio_data_quality_check(
).strftime("%Y-%m-%d %H:%M:%S"),
}

gte = GreaterThanOrEqual()
gte_result = gte.run(params["start_date"], constants.DATA_SUBSIDIO_V9_INICIO.value)

if mode == "pos":
request_params["end_timestamp"] = f"""{params["end_date"]} 00:00:00"""
request_params["dataset_id"] = constants.SUBSIDIO_SPPO_DASHBOARD_DATASET_ID.value
if gte_result:
request_params["dataset_id_v2"] = constants.SUBSIDIO_SPPO_DASHBOARD_V2_DATASET_ID.value
request_params[
"table_id_v2"
] = constants.SUBSIDIO_SPPO_DASHBOARD_SUMARIO_TABLE_ID_V2.value
else:
request_params["dataset_id_v2"] = constants.SUBSIDIO_SPPO_DASHBOARD_DATASET_ID.value
request_params["table_id_v2"] = constants.SUBSIDIO_SPPO_DASHBOARD_SUMARIO_TABLE_ID.value

checks_list = (
constants.SUBSIDIO_SPPO_DATA_CHECKS_PRE_LIST.value
Expand Down Expand Up @@ -132,6 +144,7 @@ def subsidio_data_quality_check(
else ":warning: **Status:** Testes falharam. Necessidade de revisão dos dados finais!\n"
)

# fmt: off
if not test_check:
at_code_owners = [
f' - <@{smtr_constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["user_id"]}>\n'
Expand All @@ -145,7 +158,7 @@ def subsidio_data_quality_check(
]

formatted_messages.extend(at_code_owners)

# fmt: on
format_send_discord_message(formatted_messages, webhook_url)

return test_check
42 changes: 42 additions & 0 deletions pipelines/migration/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1466,6 +1466,16 @@ def get_previous_date(days):
return now.to_date_string()


@task(checkpoint=False)
def get_posterior_date(days: int) -> str:
"""
Returns the date of {days} days from now in YYYY-MM-DD.
"""
now = pendulum.now(pendulum.timezone("America/Sao_Paulo")).add(days=days)

return now.to_date_string()


###############
#
# Pretreat data
Expand Down Expand Up @@ -1824,3 +1834,35 @@ def get_current_flow_mode() -> str:
@task
def get_flow_project():
return prefect.context.get("project_name")


@task
def check_date_in_range(start_date: str, end_date: str, comparison_date: str) -> bool:
"""
Check if comparison_date is between start_date and end_date.
"""
start_date = datetime.strptime(start_date, "%Y-%m-%d").date()
end_date = datetime.strptime(end_date, "%Y-%m-%d").date()
comparison_date = datetime.strptime(comparison_date, "%Y-%m-%d").date()

return start_date < comparison_date <= end_date


@task
def split_date_range(start_date: str, end_date: str, comparison_date: str) -> dict:
"""
Split the date range into two ranges on comparison_date.
Returns the first and second ranges.
"""
comparison_date = datetime.strptime(comparison_date, "%Y-%m-%d").date()

return {
"first_range": {
"start_date": start_date,
"end_date": (comparison_date - timedelta(days=1)).strftime("%Y-%m-%d"),
},
"second_range": {
"start_date": comparison_date.strftime("%Y-%m-%d"),
"end_date": end_date,
},
}
Loading

0 comments on commit c5ca334

Please sign in to comment.