From c5ca3346d384c346876d0a25ea65968c125a050e Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Tue, 10 Sep 2024 18:51:48 -0300 Subject: [PATCH] =?UTF-8?q?Apura=C3=A7=C3=A3o=20por=20faixa=20hor=C3=A1ria?= =?UTF-8?q?=20subs=C3=ADdio=20(#114)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Adiciona subsidio_km_teto na viagens_remuneradas * cria modelo subsidio_faixa_servico_dia * cria modelo subsidio_faixa_servico_dia_tipo_viagem * cria modelo subsidio_penalidade_servico_dia * cria modelo subsidio_sumario_servico_dia_pagamento * cria modelo sumario_faixa_servico_dia * cria modelo sumario_servico_dia_pagamento * Altera schemas e dbt_project * Corrige schema dashboard_subsidio_sppo_v2 * Corrige schema financeiro * Fix config subsidio_faixa_servico_dia * teste flow * target dev * subsidio_km_teto hard coded * corrige flow para teste * ignora dashboard_subsidio_sppo para teste * Altera tipo viagem sem transacao * altera faixa horaria para teste * adiciona selectors.yml * teste selector * cria task run_dbt_selector * add set_upstream * altera import run_dbt_selector * cria modelo ordem_servico_faixa_horaria * cria variavel DATA_SUBSIDIO_V9_INICIO * altera modelos que dependem do ordem_servico_faixa_horaria * alterações para teste * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * alterações para teste * alterações para teste * correção de digitação * alterações para teste * altera data dbt projeto_subsidio_sppo * corrige variavel no modelo * correção de digitação * adiciona distinct no select * correção de digitação * altera logica dia_seguinte * correção de digitação * corrige joins viagem_planejada * corrige lógica dia_seguinte * altera selector * altera refs * altera critérios de viagens remuneradas * altera ref subsidio_parametros * remove ambiguidade * altera join viagens_remuneradas * altera refs * teste dia seguinte * insere dia_anterior na viagem_planejada * altera lógica viagem_planejada * adiciona agrupamento para remover viagens planejadas duplicadas pela feature da faixa horária * remove datetime_partida do select final e comentários * altera refs * altera join para filtrar viagens na faixa planejada * teste lógica em valores_calculados * adiciona DATA_SUBSIDIO_V9_INICIO * adicionado DATA_SUBSIDIO_V3A_INICIO * altera para prod * cria dataset planejamento e modelo ordem_servico_faixa_horaria * remove hardcode e altera case * adiciona filtro feed_version e tipo_os na CTE dia_anterior * altera nome da coluna km_apurada_sem_passageiro para km_apurada_sem_transacao * corrige lógica viagens_planejadas * corrige select adicionando as colunas inicio_periodo e fim_periodo * adiciona colunas partidas, faixa_horaria_inicio e faixa_horaria_fim * remove coluna valor_judicial de subsidio_faixa_servico_dia_tipo_viagem * adiciona coluna indicador_penalidade_judicial * adiciona coluna indicador_penalidade_judicial, altera viagens_planejadas e viagens_planejadas_ida_volta * Altera ref e select final * adiciona coluna indicador_penalidade_judicial * altera nome da coluna partidas para partidas_total_planejada * remove valor_judicial * adiciona trunc nas colunas e indicador_viagem_remunerada no select final * altera lógica das colunas com valores * altera nome dos selectors * altera nome de indicador_viagem_remunerada para indicador_viagem_dentro_limite * altera sinal para valor_acima_limite * adiciona as colunas versao e datetime_ultima_atualizacao * altera teste_sumario_servico_dia_tipo_soma_km * cria tasks check_date_in_range e split_date_range * altera lógica da apuração caso houver datas da apuracao_subsidio_v8 e apuracao_subsidio_v9 * altera teste_sumario_servico_dia_tipo_soma_km e cria constantes SUBSIDIO_SPPO_DASHBOARD_SUMARIO_TABLE_ID e SUBSIDIO_SPPO_DASHBOARD_SUMARIO_TABLE_ID_V2 * corrige check_date_in_range e split_date_range, e altera subsidio_data_quality_check * corrige lógica subsidio_sppo_apuracao * altera lógica para rodar sem faixa horária * altera lógica conforme DATA_SUBSIDIO_V9_INICIO * add changelogs * cria financeiro_staging e move modelo subsidio_faixa_servico_dia * adiciona materialized e incremental_strategy do dataset planejamento * cria tasks get_posterior_date, check_date_in_range e split_date_range * adiciona parametro run_d0 e ajusta chamada das tasks * adiciona GreaterThanOrEqual e remove check_start_date, check_date_in_range e split_date_range * altera lógica conforme DATA_SUBSIDIO_V9_INICIO * altera modelo para pegar os dados de staging * adiciona feed_start_date no join * materializa 1 dia * altera changelog * materializa tabela * add changelog * ajusta DATA_SUBSIDIO_V9_INICIO * altera refs e remove comentários * atualiza descrição da task get_posterior_date * remove prefixo apuracaoFH * altera partidas por partidas_total_planejada * remove comentários e altera refs * adiciona tipo_os no join * adiciona coluna tipo_os * altera partidas por partidas_total_planejada * materializa d-1 * altera refs e filtro da CTE shapes * altera subquery com jinja * altera tipagem das tasks get_posterior_date e split_date_range * altera CTE servico_faixa_km_apuracao para funcionar antes e depois da vigencia da faixa horaria * corrige comentários das refs --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Rodrigo Cunha Co-authored-by: Rodrigo Cunha <66736583+eng-rodrigocunha@users.noreply.github.com> --- pipelines/migration/CHANGELOG.md | 13 + .../projeto_subsidio_sppo/CHANGELOG.md | 16 ++ .../projeto_subsidio_sppo/constants.py | 13 +- .../migration/projeto_subsidio_sppo/flows.py | 109 +++++++-- .../migration/projeto_subsidio_sppo/tasks.py | 15 +- pipelines/migration/tasks.py | 42 ++++ pipelines/tasks.py | 55 +++++ queries/dbt_project.yml | 19 ++ .../dashboard_subsidio_sppo/CHANGELOG.md | 8 + .../models/dashboard_subsidio_sppo/schema.yml | 6 +- .../dashboard_subsidio_sppo/sumario_dia.sql | 10 +- .../sumario_servico_dia.sql | 8 +- .../sumario_servico_dia_tipo.sql | 6 +- .../viagens_remuneradas.sql | 122 +++++----- .../CHANGELOG.md | 7 + .../subsidio_valor_km_tipo_viagem.sql | 1 + .../dashboard_subsidio_sppo_v2/CHANGELOG.md | 7 + .../dashboard_subsidio_sppo_v2/schema.yml | 117 +++++++++ .../sumario_faixa_servico_dia.sql | 111 +++++++++ .../sumario_servico_dia_pagamento.sql | 97 ++++++++ queries/models/docs.md | 20 ++ queries/models/financeiro/CHANGELOG.md | 7 + queries/models/financeiro/schema.yaml | 75 ++++++ .../models/financeiro/staging/CHANGELOG.md | 7 + queries/models/financeiro/staging/schema.yaml | 30 +++ .../staging/subsidio_faixa_servico_dia.sql | 82 +++++++ ...subsidio_faixa_servico_dia_tipo_viagem.sql | 159 ++++++++++++ .../subsidio_penalidade_servico_dia.sql | 56 +++++ ...subsidio_sumario_servico_dia_pagamento.sql | 118 +++++++++ queries/models/gtfs/CHANGELOG.md | 4 + .../gtfs/ordem_servico_trips_shapes_gtfs.sql | 22 +- queries/models/planejamento/CHANGELOG.md | 7 + .../ordem_servico_faixa_horaria.sql | 163 +++++++++++++ queries/models/planejamento/schema.yml | 18 ++ .../models/projeto_subsidio_sppo/CHANGELOG.md | 8 + .../aux_registros_status_trajeto.sql | 105 ++++++++ .../models/projeto_subsidio_sppo/schema.yml | 6 + .../subsidio_data_versao_efetiva.sql | 4 +- .../viagem_planejada.sql | 228 ++++++++++++++++-- .../CHANGELOG.md | 6 + .../balanco_servico_dia.sql | 2 +- .../balanco_servico_dia_pos_gt.sql | 2 +- queries/models/veiculo/sppo_veiculo_dia.sql | 4 +- queries/selectors.yml | 26 ++ 44 files changed, 1828 insertions(+), 113 deletions(-) create mode 100644 pipelines/migration/CHANGELOG.md create mode 100644 queries/models/dashboard_subsidio_sppo_staging/CHANGELOG.md create mode 100644 queries/models/dashboard_subsidio_sppo_v2/CHANGELOG.md create mode 100644 queries/models/dashboard_subsidio_sppo_v2/schema.yml create mode 100644 queries/models/dashboard_subsidio_sppo_v2/sumario_faixa_servico_dia.sql create mode 100644 queries/models/dashboard_subsidio_sppo_v2/sumario_servico_dia_pagamento.sql create mode 100644 queries/models/financeiro/CHANGELOG.md create mode 100644 queries/models/financeiro/schema.yaml create mode 100644 queries/models/financeiro/staging/CHANGELOG.md create mode 100644 queries/models/financeiro/staging/schema.yaml create mode 100644 queries/models/financeiro/staging/subsidio_faixa_servico_dia.sql create mode 100644 queries/models/financeiro/subsidio_faixa_servico_dia_tipo_viagem.sql create mode 100644 queries/models/financeiro/subsidio_penalidade_servico_dia.sql create mode 100644 queries/models/financeiro/subsidio_sumario_servico_dia_pagamento.sql create mode 100644 queries/models/planejamento/CHANGELOG.md create mode 100644 queries/models/planejamento/ordem_servico_faixa_horaria.sql create mode 100644 queries/models/planejamento/schema.yml create mode 100644 queries/selectors.yml diff --git a/pipelines/migration/CHANGELOG.md b/pipelines/migration/CHANGELOG.md new file mode 100644 index 000000000..828c40597 --- /dev/null +++ b/pipelines/migration/CHANGELOG.md @@ -0,0 +1,13 @@ +# Changelog - migration + +## [1.0.1] - 2024-08-29 + +### Adicionado + +- Adicionadas as tasks `get_posterior_date`, `check_date_in_range` e `split_date_range` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114) + +## [1.0.0] - 2024-08-29 + +### Adicionado + +- Adiciona task `run_dbt_selector` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114) \ No newline at end of file diff --git a/pipelines/migration/projeto_subsidio_sppo/CHANGELOG.md b/pipelines/migration/projeto_subsidio_sppo/CHANGELOG.md index 25a4ee5a7..65d0a4e07 100644 --- a/pipelines/migration/projeto_subsidio_sppo/CHANGELOG.md +++ b/pipelines/migration/projeto_subsidio_sppo/CHANGELOG.md @@ -1,5 +1,21 @@ # Changelog - projeto_subsidio_sppo +## [1.0.5] - 2024-08-29 + +### Alterado + +- Alterado `teste_sumario_servico_dia_tipo_soma_km` para considerar tabela de acordo com o período (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114) + +- Alterado `indicador_viagem_remunerada` para `indicador_viagem_dentro_limite` no `SUBSIDIO_SPPO_DATA_CHECKS_POS_LIST` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114) + +- Alterada a lógica do flow `subsidio_sppo_apuracao` para utilizar os selectors `apuracao_subsidio_v8` e `apuracao_subsidio_v9` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114) + +- Alterada a lógica da task `subsidio_data_quality_check` para considerar tabela de acordo com o período (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114) + +### Adicionado + +- Adiciona parâmetro run_d0 para materializar D+0 as tabelas `viagem_planejada` e `subsidio_data_versao_efetiva` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114) + ## [1.0.4] - 2024-08-19 ### Adicionado diff --git a/pipelines/migration/projeto_subsidio_sppo/constants.py b/pipelines/migration/projeto_subsidio_sppo/constants.py index 00a87c906..a892779df 100644 --- a/pipelines/migration/projeto_subsidio_sppo/constants.py +++ b/pipelines/migration/projeto_subsidio_sppo/constants.py @@ -13,18 +13,25 @@ class constants(Enum): # pylint: disable=c0103 Constant values for rj_smtr projeto_subsidio_sppo """ + SUBSIDIO_SPPO_FINANCEIRO_DATASET_ID = "financeiro" + SUBSIDIO_SPPO_DATASET_ID = "projeto_subsidio_sppo" SUBSIDIO_SPPO_SECRET_PATH = "projeto_subsidio_sppo" SUBSIDIO_SPPO_TABLE_ID = "viagem_completa" SUBSIDIO_SPPO_CODE_OWNERS = ["dados_smtr"] SUBSIDIO_SPPO_V2_DATASET_ID = "subsidio" + # Feature Apuração por faixa horária + DATA_SUBSIDIO_V9_INICIO = "2024-08-16" # SUBSÍDIO DASHBOARD # flake8: noqa: E501 SUBSIDIO_SPPO_DASHBOARD_DATASET_ID = "dashboard_subsidio_sppo" + SUBSIDIO_SPPO_DASHBOARD_V2_DATASET_ID = "dashboard_subsidio_sppo_v2" SUBSIDIO_SPPO_DASHBOARD_STAGING_DATASET_ID = "dashboard_subsidio_sppo_staging" SUBSIDIO_SPPO_DASHBOARD_TABLE_ID = "sumario_servico_dia" + SUBSIDIO_SPPO_DASHBOARD_SUMARIO_TABLE_ID = "sumario_servico_dia_tipo" + SUBSIDIO_SPPO_DASHBOARD_SUMARIO_TABLE_ID_V2 = "sumario_servico_dia_pagamento" SUBSIDIO_SPPO_DATA_CHECKS_PARAMS = { "check_trips_processing": { "query": """SELECT @@ -415,7 +422,7 @@ class constants(Enum): # pylint: disable=c0103 km_apurada, ROUND(COALESCE(km_apurada_registrado_com_ar_inoperante,0) + COALESCE(km_apurada_n_licenciado,0) + COALESCE(km_apurada_autuado_ar_inoperante,0) + COALESCE(km_apurada_autuado_seguranca,0) + COALESCE(km_apurada_autuado_limpezaequipamento,0) + COALESCE(km_apurada_licenciado_sem_ar_n_autuado,0) + COALESCE(km_apurada_licenciado_com_ar_n_autuado,0) + COALESCE(km_apurada_n_vistoriado, 0) + COALESCE(km_apurada_sem_transacao, 0),2) AS km_apurada2 FROM - `rj-smtr.dashboard_subsidio_sppo.sumario_servico_dia_tipo` + `rj-smtr`.`{dataset_id_v2}`.`{table_id_v2}` WHERE DATA BETWEEN DATE("{start_timestamp}") AND DATE("{end_timestamp}")) @@ -685,8 +692,8 @@ class constants(Enum): # pylint: disable=c0103 "expression": "id_viagem IS NOT NULL", }, "Todas viagens possuem indicador de viagem remunerada não nulo e verdadeiro/falso": { - "expression": "indicador_viagem_remunerada IS NOT NULL\ - AND indicador_viagem_remunerada IN (TRUE, FALSE)", + "expression": "indicador_viagem_dentro_limite IS NOT NULL\ + AND indicador_viagem_dentro_limite IN (TRUE, FALSE)", }, "Todas viagens com distância planejada não nula e maior ou igual a zero": { "expression": "distancia_planejada IS NOT NULL AND distancia_planejada >= 0", diff --git a/pipelines/migration/projeto_subsidio_sppo/flows.py b/pipelines/migration/projeto_subsidio_sppo/flows.py index d23a08ec7..bf053a913 100644 --- a/pipelines/migration/projeto_subsidio_sppo/flows.py +++ b/pipelines/migration/projeto_subsidio_sppo/flows.py @@ -10,6 +10,7 @@ from prefect.run_configs import KubernetesRun from prefect.storage import GCS from prefect.tasks.control_flow import merge +from prefect.tasks.core.operators import GreaterThanOrEqual from prefect.tasks.prefect import create_flow_run, wait_for_flow_run from prefect.utilities.edges import unmapped from prefeitura_rio.pipelines_utils.custom import Flow @@ -22,28 +23,32 @@ from pipelines.constants import constants as smtr_constants from pipelines.migration.projeto_subsidio_sppo.constants import constants - -# from pipelines.materialize_to_datario.flows import ( -# smtr_materialize_to_datario_viagem_sppo_flow, -# ) from pipelines.migration.projeto_subsidio_sppo.tasks import ( check_param, subsidio_data_quality_check, ) from pipelines.migration.tasks import ( + check_date_in_range, fetch_dataset_sha, get_current_flow_labels, get_current_flow_mode, get_flow_project, get_join_dict, get_now_date, + get_posterior_date, get_previous_date, get_run_dates, rename_current_flow_run_now_time, run_dbt_model, + split_date_range, ) from pipelines.migration.veiculo.flows import sppo_veiculo_dia from pipelines.schedules import every_day_hour_five, every_day_hour_seven_minute_five +from pipelines.tasks import run_dbt_selector + +# from pipelines.materialize_to_datario.flows import ( +# smtr_materialize_to_datario_viagem_sppo_flow, +# ) # EMD Imports # @@ -64,6 +69,7 @@ # Get default parameters # date_range_start = Parameter("date_range_start", default=False) date_range_end = Parameter("date_range_end", default=False) + run_d0 = Parameter("run_d0", default=True) run_dates = get_run_dates(date_range_start, date_range_end) @@ -94,6 +100,14 @@ _vars=_vars, ) + with case(run_d0, True): + date_d0 = get_posterior_date(1) + RUN_2 = run_dbt_model( + dataset_id=constants.SUBSIDIO_SPPO_DATASET_ID.value, + table_id="subsidio_data_versao_efetiva viagem_planejada", + _vars={"run_date": date_d0, "version": dataset_sha}, + ) + viagens_sppo.storage = GCS(smtr_constants.GCS_FLOWS_BUCKET.value) viagens_sppo.run_config = KubernetesRun( image=smtr_constants.DOCKER_IMAGE.value, labels=[smtr_constants.RJ_SMTR_AGENT_LABEL.value] @@ -157,7 +171,8 @@ dataset_id=constants.SUBSIDIO_SPPO_DASHBOARD_DATASET_ID.value, ) - _vars = {"start_date": start_date, "end_date": end_date} + dates = [{"start_date": start_date, "end_date": end_date}] + _vars = get_join_dict(dict_list=dates, new_dict=dataset_sha)[0] # 2. MATERIALIZE DATA # with case(test_only, False): @@ -200,28 +215,72 @@ with case(SUBSIDIO_SPPO_DATA_QUALITY_PRE, True): # 4. CALCULATE # - SUBSIDIO_SPPO_STAGING_RUN = run_dbt_model( - # dbt_client=dbt_client, - dataset_id=constants.SUBSIDIO_SPPO_DASHBOARD_STAGING_DATASET_ID.value, - _vars=_vars, - upstream_tasks=[SUBSIDIO_SPPO_DATA_QUALITY_PRE], + date_in_range = check_date_in_range( + _vars["start_date"], _vars["end_date"], constants.DATA_SUBSIDIO_V9_INICIO.value ) - SUBSIDIO_SPPO_APURACAO_RUN = run_dbt_model( - # dbt_client=dbt_client, - dataset_id=constants.SUBSIDIO_SPPO_V2_DATASET_ID.value - + " " - + constants.SUBSIDIO_SPPO_DASHBOARD_DATASET_ID.value, - _vars=_vars, - upstream_tasks=[SUBSIDIO_SPPO_STAGING_RUN], - ) - - # 5. POST-DATA QUALITY CHECK # - SUBSIDIO_SPPO_DATA_QUALITY_POS = subsidio_data_quality_check( - mode="pos", - params=_vars, - upstream_tasks=[SUBSIDIO_SPPO_APURACAO_RUN], - ) + with case(date_in_range, True): + date_intervals = split_date_range( + _vars["start_date"], _vars["end_date"], constants.DATA_SUBSIDIO_V9_INICIO.value + ) + + dbt_vars_1 = get_join_dict( + dict_list=[_vars], new_dict=date_intervals["first_range"] + )[0] + + SUBSIDIO_SPPO_APURACAO_RUN = run_dbt_selector( + selector_name="apuracao_subsidio_v8", + _vars=dbt_vars_1, + ) + + # POST-DATA QUALITY CHECK # + SUBSIDIO_SPPO_DATA_QUALITY_POS = subsidio_data_quality_check( + mode="pos", + params=dbt_vars_1, + upstream_tasks=[SUBSIDIO_SPPO_APURACAO_RUN], + ) + + dbt_vars_2 = get_join_dict( + dict_list=[dbt_vars_1], + new_dict=date_intervals["second_range"], + upstream_tasks=[SUBSIDIO_SPPO_DATA_QUALITY_POS], + )[0] + + SUBSIDIO_SPPO_APURACAO_RUN_2 = run_dbt_selector( + selector_name="apuracao_subsidio_v9", + _vars=dbt_vars_2, + upstream_tasks=[dbt_vars_2], + ) + + # POST-DATA QUALITY CHECK # + SUBSIDIO_SPPO_DATA_QUALITY_POS_2 = subsidio_data_quality_check( + mode="pos", + params=dbt_vars_2, + upstream_tasks=[SUBSIDIO_SPPO_APURACAO_RUN_2], + ) + + with case(date_in_range, False): + gte = GreaterThanOrEqual() + gte_result = gte.run(_vars["start_date"], constants.DATA_SUBSIDIO_V9_INICIO.value) + + with case(gte_result, False): + SUBSIDIO_SPPO_APURACAO_RUN = run_dbt_selector( + selector_name="apuracao_subsidio_v8", + _vars=_vars, + ) + + with case(gte_result, True): + SUBSIDIO_SPPO_APURACAO_RUN = run_dbt_selector( + selector_name="apuracao_subsidio_v9", + _vars=_vars, + ) + + # POST-DATA QUALITY CHECK # + SUBSIDIO_SPPO_DATA_QUALITY_POS = subsidio_data_quality_check( + mode="pos", + params=_vars, + upstream_tasks=[SUBSIDIO_SPPO_APURACAO_RUN], + ) # TODO: test upstream_tasks=[SUBSIDIO_SPPO_DASHBOARD_RUN] # 6. PUBLISH # diff --git a/pipelines/migration/projeto_subsidio_sppo/tasks.py b/pipelines/migration/projeto_subsidio_sppo/tasks.py index 18363a2c6..1b540a90e 100644 --- a/pipelines/migration/projeto_subsidio_sppo/tasks.py +++ b/pipelines/migration/projeto_subsidio_sppo/tasks.py @@ -6,6 +6,7 @@ from datetime import datetime, timedelta from prefect import task +from prefect.tasks.core.operators import GreaterThanOrEqual from pipelines.constants import constants as smtr_constants from pipelines.migration.projeto_subsidio_sppo.constants import constants @@ -60,9 +61,20 @@ def subsidio_data_quality_check( ).strftime("%Y-%m-%d %H:%M:%S"), } + gte = GreaterThanOrEqual() + gte_result = gte.run(params["start_date"], constants.DATA_SUBSIDIO_V9_INICIO.value) + if mode == "pos": request_params["end_timestamp"] = f"""{params["end_date"]} 00:00:00""" request_params["dataset_id"] = constants.SUBSIDIO_SPPO_DASHBOARD_DATASET_ID.value + if gte_result: + request_params["dataset_id_v2"] = constants.SUBSIDIO_SPPO_DASHBOARD_V2_DATASET_ID.value + request_params[ + "table_id_v2" + ] = constants.SUBSIDIO_SPPO_DASHBOARD_SUMARIO_TABLE_ID_V2.value + else: + request_params["dataset_id_v2"] = constants.SUBSIDIO_SPPO_DASHBOARD_DATASET_ID.value + request_params["table_id_v2"] = constants.SUBSIDIO_SPPO_DASHBOARD_SUMARIO_TABLE_ID.value checks_list = ( constants.SUBSIDIO_SPPO_DATA_CHECKS_PRE_LIST.value @@ -132,6 +144,7 @@ def subsidio_data_quality_check( else ":warning: **Status:** Testes falharam. Necessidade de revisão dos dados finais!\n" ) + # fmt: off if not test_check: at_code_owners = [ f' - <@{smtr_constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["user_id"]}>\n' @@ -145,7 +158,7 @@ def subsidio_data_quality_check( ] formatted_messages.extend(at_code_owners) - + # fmt: on format_send_discord_message(formatted_messages, webhook_url) return test_check diff --git a/pipelines/migration/tasks.py b/pipelines/migration/tasks.py index ded988b9a..d5b7a2b79 100644 --- a/pipelines/migration/tasks.py +++ b/pipelines/migration/tasks.py @@ -1466,6 +1466,16 @@ def get_previous_date(days): return now.to_date_string() +@task(checkpoint=False) +def get_posterior_date(days: int) -> str: + """ + Returns the date of {days} days from now in YYYY-MM-DD. + """ + now = pendulum.now(pendulum.timezone("America/Sao_Paulo")).add(days=days) + + return now.to_date_string() + + ############### # # Pretreat data @@ -1824,3 +1834,35 @@ def get_current_flow_mode() -> str: @task def get_flow_project(): return prefect.context.get("project_name") + + +@task +def check_date_in_range(start_date: str, end_date: str, comparison_date: str) -> bool: + """ + Check if comparison_date is between start_date and end_date. + """ + start_date = datetime.strptime(start_date, "%Y-%m-%d").date() + end_date = datetime.strptime(end_date, "%Y-%m-%d").date() + comparison_date = datetime.strptime(comparison_date, "%Y-%m-%d").date() + + return start_date < comparison_date <= end_date + + +@task +def split_date_range(start_date: str, end_date: str, comparison_date: str) -> dict: + """ + Split the date range into two ranges on comparison_date. + Returns the first and second ranges. + """ + comparison_date = datetime.strptime(comparison_date, "%Y-%m-%d").date() + + return { + "first_range": { + "start_date": start_date, + "end_date": (comparison_date - timedelta(days=1)).strftime("%Y-%m-%d"), + }, + "second_range": { + "start_date": comparison_date.strftime("%Y-%m-%d"), + "end_date": end_date, + }, + } diff --git a/pipelines/tasks.py b/pipelines/tasks.py index 3a252ee21..c91fc7cd0 100644 --- a/pipelines/tasks.py +++ b/pipelines/tasks.py @@ -5,6 +5,15 @@ import prefect from prefect import task + +try: + from prefect.tasks.dbt.dbt import DbtShellTask +except ImportError: + from prefeitura_rio.utils import base_assert_dependencies + + base_assert_dependencies(["prefect"], extras=["pipelines"]) + +from prefeitura_rio.pipelines_utils.io import get_root_path from prefeitura_rio.pipelines_utils.logging import log from prefeitura_rio.pipelines_utils.prefect import get_flow_run_mode from pytz import timezone @@ -216,3 +225,49 @@ def run_subflow( if flag_failed_runs: raise FailedSubFlow(failed_message) + + +@task +def run_dbt_selector( + selector_name: str, + flags: str = None, + _vars: dict | list[dict] = None, +): + """ + Runs a DBT selector. + + Args: + selector_name (str): The name of the DBT selector to run. + flags (str, optional): Flags to pass to the dbt run command. + _vars (Union[dict, list[dict]], optional): Variables to pass to dbt. Defaults to None. + """ + # Build the dbt command + run_command = f"dbt run --selector {selector_name}" + + if _vars: + if isinstance(_vars, list): + vars_dict = {} + for elem in _vars: + vars_dict.update(elem) + vars_str = f'"{vars_dict}"' + run_command += f" --vars {vars_str}" + else: + vars_str = f'"{_vars}"' + run_command += f" --vars {vars_str}" + + if flags: + run_command += f" {flags}" + + log(f"Running dbt with command: {run_command}") + root_path = get_root_path() + queries_dir = str(root_path / "queries") + dbt_task = DbtShellTask( + profiles_dir=queries_dir, + helper_script=f"cd {queries_dir}", + log_stderr=True, + return_all=True, + command=run_command, + ) + dbt_logs = dbt_task.run() + + log("\n".join(dbt_logs)) diff --git a/queries/dbt_project.yml b/queries/dbt_project.yml index cb7cdb6c1..7eda4f612 100644 --- a/queries/dbt_project.yml +++ b/queries/dbt_project.yml @@ -109,6 +109,8 @@ vars: DATA_SUBSIDIO_V2_INICIO: "2023-01-16" # Feature penalidade de autuação por segurança e limpeza/equipamento (DECRETO RIO 52820/2023) DATA_SUBSIDIO_V3_INICIO: "2023-07-04" + # Feature viagens remuneradas (RESOLUÇÃO SMTR Nº 3645/2023) + DATA_SUBSIDIO_V3A_INICIO: "2023-09-16" # Feature penalidade aplicada por agente de verão (DECRETO RIO 53856/2023 e RESOLUÇÃO SMTR 3682/2024) DATA_SUBSIDIO_V4_INICIO: "2024-01-04" # Feature penalidade de vistoria (RESOLUÇÃO SMTR 3683/2024) @@ -119,6 +121,8 @@ vars: DATA_SUBSIDIO_V7_INICIO: "2024-05-01" # Feature Viagens sem transação DATA_SUBSIDIO_V8_INICIO: "2024-07-19" + # Feature Apuração por faixa horária + DATA_SUBSIDIO_V9_INICIO: "2024-08-16" # valor_subsidio: "`rj-smtr-dev.projeto_subsidio_sppo.valor_subsidio`" # Recursos # @@ -289,6 +293,17 @@ models: staging: +materialized: incremental +schema: validacao_dados_jae_staging + financeiro: + +materialized: incremental + +incremental_strategy: insert_overwrite + +schema: financeiro + staging: + +materialized: incremental + +incremental_strategy: insert_overwrite + +schema: financeiro_staging + dashboard_subsidio_sppo_v2: + +materialized: view + +schema: dashboard_subsidio_sppo_v2 subsidio: +materialized: incremental +incremental_strategy: insert_overwrite @@ -296,6 +311,10 @@ models: catalogo: +materialized: view +schema: catalogo + planejamento: + +materialized: incremental + +incremental_strategy: insert_overwrite + +schema: planejamento transito: +materialized: incremental +schema: transito diff --git a/queries/models/dashboard_subsidio_sppo/CHANGELOG.md b/queries/models/dashboard_subsidio_sppo/CHANGELOG.md index ef868cdce..39b044277 100644 --- a/queries/models/dashboard_subsidio_sppo/CHANGELOG.md +++ b/queries/models/dashboard_subsidio_sppo/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog - dashboard_subsidio_sppo +## [7.0.2] - 2024-08-29 + +### Alterado + +- Alterado os modelos `sumario_dia`, `sumario_servico_dia` e `sumario_servico_dia_tipo` em razão de alterações no modelo `viagem_planejada.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114) + +- Alterado modelo `viagens_remuneradas` em razão da apuração por faixa horária (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114) + ## [7.0.1] - 2024-08-19 ## Adicionado diff --git a/queries/models/dashboard_subsidio_sppo/schema.yml b/queries/models/dashboard_subsidio_sppo/schema.yml index 82dd25658..14d14e11f 100644 --- a/queries/models/dashboard_subsidio_sppo/schema.yml +++ b/queries/models/dashboard_subsidio_sppo/schema.yml @@ -362,5 +362,7 @@ models: description: "Distância planejada da viagem (km)." - name: subsidio_km description: "Valor de subsídio, conforme tipo de viagem (R$/km)." - - name: indicador_viagem_remunerada - description: "Indicador se a viagem foi remunerada ou não." \ No newline at end of file + - name: subsidio_km_teto + description: "Valor máximo de subsídio, conforme tipo de viagem (R$/km)." + - name: indicador_viagem_dentro_limite + description: "Indica se a viagem foi remunerada por estar abaixo do teto de 120%/200%." \ No newline at end of file diff --git a/queries/models/dashboard_subsidio_sppo/sumario_dia.sql b/queries/models/dashboard_subsidio_sppo/sumario_dia.sql index ca7b8a857..3ce792eeb 100644 --- a/queries/models/dashboard_subsidio_sppo/sumario_dia.sql +++ b/queries/models/dashboard_subsidio_sppo/sumario_dia.sql @@ -10,10 +10,14 @@ WITH CASE WHEN sentido = "C" THEN MAX(distancia_planejada) ELSE - SUM(distancia_planejada) - END + SUM(distancia_planejada) + END AS distancia_planejada, - MAX(distancia_total_planejada) AS distancia_total_planejada, + CASE + WHEN sentido = "C" THEN MAX(distancia_total_planejada) + ELSE + SUM(distancia_total_planejada) + END AS distancia_total_planejada, NULL AS viagens_planejadas FROM {{ ref("viagem_planejada") }} diff --git a/queries/models/dashboard_subsidio_sppo/sumario_servico_dia.sql b/queries/models/dashboard_subsidio_sppo/sumario_servico_dia.sql index 826a363b1..3727375b7 100644 --- a/queries/models/dashboard_subsidio_sppo/sumario_servico_dia.sql +++ b/queries/models/dashboard_subsidio_sppo/sumario_servico_dia.sql @@ -16,7 +16,8 @@ WITH tipo_dia, consorcio, servico, - distancia_total_planejada AS km_planejada, + sentido, + SUM(distancia_total_planejada) AS km_planejada, FROM {{ ref("viagem_planejada") }} -- `rj-smtr`.`projeto_subsidio_sppo`.`viagem_planejada` @@ -25,6 +26,7 @@ WITH AND DATE( "{{ var("end_date") }}" ) AND ( distancia_total_planejada > 0 OR distancia_total_planejada IS NULL ) + GROUP BY 1,2,3,4,5 ), -- 2. Viagens realizadas viagem AS ( @@ -52,7 +54,7 @@ WITH COALESCE(SUM(v.distancia_planejada), 0) AS km_apurada, COALESCE(ROUND(100 * SUM(v.distancia_planejada) / p.km_planejada,2), 0) AS perc_km_planejada FROM - planejado AS p + (SELECT DISTINCT * EXCEPT(sentido) FROM planejado ) AS p LEFT JOIN viagem AS v USING @@ -77,7 +79,7 @@ WITH WHERE DATA BETWEEN DATE("{{ var("start_date") }}") AND DATE( "{{ var("end_date") }}" ) - AND indicador_viagem_remunerada IS TRUE), + AND indicador_viagem_dentro_limite IS TRUE), servico_subsidio_apuracao AS ( SELECT DATA, diff --git a/queries/models/dashboard_subsidio_sppo/sumario_servico_dia_tipo.sql b/queries/models/dashboard_subsidio_sppo/sumario_servico_dia_tipo.sql index 5ee6b7f79..a95dd0f19 100644 --- a/queries/models/dashboard_subsidio_sppo/sumario_servico_dia_tipo.sql +++ b/queries/models/dashboard_subsidio_sppo/sumario_servico_dia_tipo.sql @@ -13,7 +13,8 @@ WITH tipo_dia, consorcio, servico, - distancia_total_planejada AS km_planejada + sentido, + SUM(distancia_total_planejada) AS km_planejada FROM -- rj-smtr.projeto_subsidio_sppo.viagem_planejada {{ ref("viagem_planejada") }} @@ -25,6 +26,7 @@ WITH {% endif %} AND (distancia_total_planejada > 0 OR distancia_total_planejada IS NOT NULL) + GROUP BY 1,2,3,4,5 ), viagem AS ( SELECT @@ -80,7 +82,7 @@ WITH IFNULL(v.viagens, 0) AS viagens, IFNULL(v.km_apurada, 0) AS km_apurada, FROM - planejado p + (SELECT DISTINCT * EXCEPT(sentido) FROM planejado ) p LEFT JOIN servico_km_tipo_atualizado v ON diff --git a/queries/models/dashboard_subsidio_sppo/viagens_remuneradas.sql b/queries/models/dashboard_subsidio_sppo/viagens_remuneradas.sql index c0a6c0bf4..047c92994 100644 --- a/queries/models/dashboard_subsidio_sppo/viagens_remuneradas.sql +++ b/queries/models/dashboard_subsidio_sppo/viagens_remuneradas.sql @@ -22,6 +22,9 @@ WITH tipo_dia, consorcio, servico, + faixa_horaria_inicio, + faixa_horaria_fim, + partidas_total_planejada, distancia_total_planejada AS km_planejada, FROM {{ ref("viagem_planejada") }} @@ -64,9 +67,15 @@ WITH ), viagem_planejada AS ( SELECT - p.*, - viagens_planejadas, - v.partidas_ida + v.partidas_volta AS viagens_planejadas_ida_volta + p.data, + p.tipo_dia, + p.consorcio, + p.servico, + p.faixa_horaria_inicio, + p.faixa_horaria_fim, + v.viagens_planejadas, + p.km_planejada, + IF(p.data >= DATE("{{ var("DATA_SUBSIDIO_V9_INICIO") }}"), p.partidas_total_planejada, v.partidas_ida + v.partidas_volta) AS viagens_planejadas_ida_volta FROM planejado AS p LEFT JOIN @@ -88,7 +97,9 @@ WITH DISTINCT data_inicio, data_fim, status, - subsidio_km + subsidio_km, + MAX(subsidio_km) OVER (PARTITION BY data_inicio, data_fim) AS subsidio_km_teto, + indicador_penalidade_judicial FROM {{ ref("subsidio_valor_km_tipo_viagem") }} -- rj-smtr-staging.dashboard_subsidio_sppo_staging.subsidio_valor_km_tipo_viagem @@ -111,83 +122,86 @@ WITH vt.servico, vt.tipo_viagem, vt.id_viagem, + vt.datetime_partida, vt.distancia_planejada, - t.subsidio_km + t.subsidio_km, + t.subsidio_km_teto, + t.indicador_penalidade_judicial FROM viagem_transacao AS vt LEFT JOIN subsidio_parametros AS t ON - vt.data BETWEEN t.data_inicio AND t.data_fim - AND vt.tipo_viagem = t.status - ), + vt.data BETWEEN t.data_inicio + AND t.data_fim + AND vt.tipo_viagem = t.status ), -- 5. Apuração de km realizado e Percentual de Operação Diário (POD) - servico_km_apuracao AS ( + servico_faixa_km_apuracao AS ( SELECT p.data, p.tipo_dia, + p.faixa_horaria_inicio, + p.faixa_horaria_fim, p.consorcio, p.servico, p.km_planejada AS km_planejada, - COALESCE(COUNT(v.id_viagem), 0) AS viagens, - COALESCE(SUM(v.distancia_planejada), 0) AS km_apurada, - COALESCE(ROUND(100 * SUM(v.distancia_planejada) / p.km_planejada,2), 0) AS perc_km_planejada + COALESCE(ROUND(100 * SUM(v.distancia_planejada) / p.km_planejada,2), 0) AS pof FROM viagem_planejada AS p LEFT JOIN viagem_km_tipo AS v - USING - (data, - servico) + ON + p.data = v.data + AND p.servico = v.servico + AND v.datetime_partida BETWEEN p.faixa_horaria_inicio + AND p.faixa_horaria_fim GROUP BY - 1, - 2, - 3, - 4, - 5 ) + 1, 2, 3, 4, 5, 6, 7 + ) -- 6. Flag de viagens que serão consideradas ou não para fins de remuneração (apuração de valor de subsídio) - RESOLUÇÃO SMTR Nº 3645/2023 SELECT - v.* EXCEPT(rn), + v.* EXCEPT(rn, datetime_partida), CASE - WHEN v.tipo_viagem = "Sem transação" - THEN FALSE - WHEN data >= "2023-09-16" - AND p.tipo_dia = "Dia Útil" - AND viagens_planejadas > 10 - AND perc_km_planejada > 120 - AND rn > viagens_planejadas_ida_volta*1.2 - THEN FALSE - WHEN data >= "2023-09-16" - AND p.tipo_dia = "Dia Útil" - AND viagens_planejadas <= 10 - AND perc_km_planejada > 200 - AND rn > viagens_planejadas_ida_volta*2 - THEN FALSE - WHEN data >= "2023-09-16" - AND (p.tipo_dia = "Dia Útil" - AND (viagens_planejadas IS NULL - OR perc_km_planejada IS NULL - OR rn IS NULL - OR viagens_planejadas_ida_volta IS NULL + WHEN p.data >= DATE("{{ var("DATA_SUBSIDIO_V3A_INICIO") }}") + AND p.tipo_dia = "Dia Útil" + AND viagens_planejadas > 10 + AND pof > 120 + AND rn > viagens_planejadas_ida_volta*1.2 + THEN FALSE + WHEN p.data >= DATE("{{ var("DATA_SUBSIDIO_V3A_INICIO") }}") + AND p.tipo_dia = "Dia Útil" + AND viagens_planejadas <= 10 + AND pof > 200 + AND rn > viagens_planejadas_ida_volta*2 + THEN FALSE + WHEN p.data >= DATE("{{ var("DATA_SUBSIDIO_V3A_INICIO") }}") + AND (p.tipo_dia = "Dia Útil" + AND (viagens_planejadas IS NULL + OR pof IS NULL + OR rn IS NULL + ) ) - ) THEN NULL ELSE TRUE - END AS indicador_viagem_remunerada + END AS indicador_viagem_dentro_limite FROM ( SELECT - *, - ROW_NUMBER() OVER(PARTITION BY data, servico ORDER BY subsidio_km*distancia_planejada DESC) AS rn + *, + ROW_NUMBER() OVER(PARTITION BY data, servico ORDER BY subsidio_km*distancia_planejada DESC) AS rn FROM - viagem_km_tipo ) AS v + viagem_km_tipo ) AS v LEFT JOIN - viagem_planejada AS p -USING - (data, - servico) + viagem_planejada AS p +ON + p.data = v.data + AND p.servico = v.servico + AND v.datetime_partida BETWEEN p.faixa_horaria_inicio + AND p.faixa_horaria_fim LEFT JOIN - servico_km_apuracao AS s -USING - (data, - servico) \ No newline at end of file + servico_faixa_km_apuracao AS s +ON + s.data = v.data + AND s.servico = v.servico + AND v.datetime_partida BETWEEN s.faixa_horaria_inicio + AND s.faixa_horaria_fim \ No newline at end of file diff --git a/queries/models/dashboard_subsidio_sppo_staging/CHANGELOG.md b/queries/models/dashboard_subsidio_sppo_staging/CHANGELOG.md new file mode 100644 index 000000000..b883ffe91 --- /dev/null +++ b/queries/models/dashboard_subsidio_sppo_staging/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog - dashboard_subsidio_sppo_staging + +## [1.0.0] - 2024-08-29 + +### Adicionado + +- Adicionada coluna `indicador_penalidade_judicial` na view `subsidio_valor_km_tipo_viagem` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114) \ No newline at end of file diff --git a/queries/models/dashboard_subsidio_sppo_staging/subsidio_valor_km_tipo_viagem.sql b/queries/models/dashboard_subsidio_sppo_staging/subsidio_valor_km_tipo_viagem.sql index 32265c350..a6eda52b3 100644 --- a/queries/models/dashboard_subsidio_sppo_staging/subsidio_valor_km_tipo_viagem.sql +++ b/queries/models/dashboard_subsidio_sppo_staging/subsidio_valor_km_tipo_viagem.sql @@ -10,6 +10,7 @@ SELECT SAFE_CAST(irk AS FLOAT64) irk, SAFE_CAST(data_inicio AS DATE) data_inicio, SAFE_CAST(data_fim AS DATE) data_fim, + SAFE_CAST(indicador_penalidade_judicial AS BOOL) indicador_penalidade_judicial, SAFE_CAST(legislacao AS STRING) legislacao FROM {{ source("dashboard_subsidio_sppo_staging", "subsidio_valor_km_tipo_viagem") }} \ No newline at end of file diff --git a/queries/models/dashboard_subsidio_sppo_v2/CHANGELOG.md b/queries/models/dashboard_subsidio_sppo_v2/CHANGELOG.md new file mode 100644 index 000000000..290d351a2 --- /dev/null +++ b/queries/models/dashboard_subsidio_sppo_v2/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog - dashboard_subsidio_sppo_v2 + +## [1.0.0] - 2024-08-29 + +### Adicionado + +- Cria modelos `sumario_faixa_servico_dia` e `sumario_servico_dia_pagamento` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114) \ No newline at end of file diff --git a/queries/models/dashboard_subsidio_sppo_v2/schema.yml b/queries/models/dashboard_subsidio_sppo_v2/schema.yml new file mode 100644 index 000000000..5bfc35034 --- /dev/null +++ b/queries/models/dashboard_subsidio_sppo_v2/schema.yml @@ -0,0 +1,117 @@ +version: 2 + +models: + - name: sumario_faixa_servico_dia + description: "Sumário do subsídio dos serviços de ônibus (SPPO) por dia e faixa horária." + columns: + - name: data + description: "Data de emissão do sinal de GPS." + - name: tipo_dia + description: "Dia da semana considerado para o cálculo da distância planejada - categorias: Dia Útil, Sabado, Domingo." + - name: faixa_horaria_inicio + description: "Horário inicial da faixa horária" + - name: faixa_horaria_fim + description: "Horário final da faixa horária" + - name: consorcio + description: "Consórcio que opera o serviço." + - name: servico + description: "Serviço realizado pelo veículo." + - name: viagens_faixa + description: "Quantidade de viagens apuradas por faixa horária." + - name: km_apurada_faixa + description: "Distância apurada para o serviço por faixa horária(km)." + - name: km_subsidiada_faixa + description: "Distância subsidiada para o serviço por faixa horária (km)." + - name: km_planejada_faixa + description: "Distância planejada para o serviço por faixa horária (km)." + - name: pof + description: "Indicador percentual de quilometragem apurada em relação à planejada da linha por faixa horária." + - name: km_apurada_registrado_com_ar_inoperante + description: "Distância apurada de viagens realizadas por veículo licenciado com ar condicionado e registrado por agente de verão (RESOLUÇÃO SMTR Nº 3.682/2024) em razão de inoperância ou mau funcionamento deste (km)." + - name: km_apurada_n_licenciado + description: "Quilometragem apurada de viagens de veículos não licenciados." + - name: km_apurada_autuado_ar_inoperante + description: "Quilometragem apurada de viagens de veículos autuados por ar inoperante." + - name: km_apurada_autuado_seguranca + description: "Quilometragem apurada de viagens de veículos autuados por segurança." + - name: km_apurada_autuado_limpezaequipamento + description: "Quilometragem apurada de viagens de veículos autuados por limpeza ou equipamento." + - name: km_apurada_licenciado_sem_ar_n_autuado + description: "Quilometragem apurada de viagens de veículos sem ar e não autuados." + - name: km_apurada_licenciado_com_ar_n_autuado + description: "Quilometragem apurada de viagens de veículos com ar e não autuados." + - name: km_apurada_n_vistoriado + description: "Distância apurada de viagens realizadas por veículo não vistoriado tempestivamente conforme calendário de vistoria (km)." + - name: km_apurada_sem_transacao + description: "Distância apurada de viagens realizadas sem passageiro registrado." + - name: valor_apurado + description: "Valor da distância apurada multiplicada pelo subsídio por quilômetro (sem glosa). É zerado quando POF < 80%." + - name: valor_acima_limite + description: "Valor apurado das viagens que não foram remuneradas (por estar acima do teto de 120% / 200%)." + - name: valor_total_sem_glosa + description: "Valor total das viagens considerando o valor máximo por km." + - name: valor_judicial + description: "Valor de glosa depositada em juízo (Autuação por ar inoperante, Veículo licenciado sem ar, Penalidade abaixo de 60% e Notificação dos Agentes de Verão)." + - name: versao + description: "{{ doc('versao') }}" + - name: datetime_ultima_atualizacao + description: "{{ doc('datetime_ultima_atualizacao') }}" + - name: sumario_servico_dia_pagamento + description: "Sumário do subsídio dos serviços de ônibus (SPPO) por dia." + columns: + - name: data + description: "Data de emissão do sinal de GPS." + - name: tipo_dia + description: "Dia da semana considerado para o cálculo da distância planejada - categorias: Dia Útil, Sabado, Domingo." + - name: consorcio + description: "Consórcio que opera o serviço." + - name: servico + description: "Serviço realizado pelo veículo." + - name: viagens_dia + description: "Quantidade de viagens apuradas por dia." + - name: km_apurada_dia + description: "Distância apurada para o serviço por dia (km)." + - name: km_subsidiada_dia + description: "Distância subsidiada para o serviço por dia (km)." + - name: km_planejada_dia + description: "Distância planejada para o serviço por dia (km)." + - name: media_pof + description: "Média do indicador percentual de operação por faixa horária." + - name: desvp_pof + description: "Desvio padrão do indicador percentual de operação por faixa horária." + - name: km_apurada_registrado_com_ar_inoperante + description: "Distância apurada de viagens realizadas por veículo licenciado com ar condicionado e registrado por agente de verão (RESOLUÇÃO SMTR Nº 3.682/2024) em razão de inoperância ou mau funcionamento deste (km)." + - name: km_apurada_n_licenciado + description: "Quilometragem apurada de viagens de veículos não licenciados." + - name: km_apurada_autuado_ar_inoperante + description: "Quilometragem apurada de viagens de veículos autuados por ar inoperante." + - name: km_apurada_autuado_seguranca + description: "Quilometragem apurada de viagens de veículos autuados por segurança." + - name: km_apurada_autuado_limpezaequipamento + description: "Quilometragem apurada de viagens de veículos autuados por limpeza ou equipamento." + - name: km_apurada_licenciado_sem_ar_n_autuado + description: "Quilometragem apurada de viagens de veículos sem ar e não autuados." + - name: km_apurada_licenciado_com_ar_n_autuado + description: "Quilometragem apurada de viagens de veículos com ar e não autuados." + - name: km_apurada_n_vistoriado + description: "Distância apurada de viagens realizadas por veículo não vistoriado tempestivamente conforme calendário de vistoria (km)." + - name: km_apurada_sem_transacao + description: "Distância apurada de viagens realizadas sem passageiro registrado." + - name: valor_a_pagar + description: "Valor efetivo de pagamento (valor_total_apurado - valor_acima_limite - valor_glosado)." + - name: valor_glosado + description: "Valor total das viagens considerando o valor máximo por km, subtraído pelo valor efetivo por km." + - name: valor_acima_limite + description: "Valor apurado das viagens que não foram remuneradas (por estar acima do teto de 120% / 200%)." + - name: valor_total_sem_glosa + description: "Valor total das viagens considerando o valor máximo por km." + - name: valor_total_apurado + description: "Valor total das viagens apuradas, subtraídas as penalidades (POF =< 60%)." + - name: valor_judicial + description: "Valor de glosa depositada em juízo (Autuação por ar inoperante, Veículo licenciado sem ar, Penalidade abaixo de 60% e Notificação dos Agentes de Verão)." + - name: valor_penalidade + description: "Valor penalidade [negativa] (POF =< 60%)." + - name: versao + description: "{{ doc('versao') }}" + - name: datetime_ultima_atualizacao + description: "{{ doc('datetime_ultima_atualizacao') }}" \ No newline at end of file diff --git a/queries/models/dashboard_subsidio_sppo_v2/sumario_faixa_servico_dia.sql b/queries/models/dashboard_subsidio_sppo_v2/sumario_faixa_servico_dia.sql new file mode 100644 index 000000000..f0564a148 --- /dev/null +++ b/queries/models/dashboard_subsidio_sppo_v2/sumario_faixa_servico_dia.sql @@ -0,0 +1,111 @@ +{{ + config( + materialized="incremental", + partition_by={"field": "data", "data_type": "date", "granularity": "day"}, + incremental_strategy="insert_overwrite", + ) +}} + +WITH + subsidio_faixa AS ( + SELECT + data, + tipo_dia, + faixa_horaria_inicio, + faixa_horaria_fim, + consorcio, + servico, + viagens_faixa, + km_apurada_faixa, + km_planejada_faixa, + pof + FROM + -- rj-smtr-dev.financeiro.subsidio_faixa_servico_dia + {{ ref("subsidio_faixa_servico_dia") }} + WHERE + data BETWEEN DATE("{{ var("start_date") }}") + AND DATE("{{ var("end_date") }}") + ), + subsidio_faixa_agg AS ( + SELECT + data, + tipo_dia, + faixa_horaria_inicio, + faixa_horaria_fim, + consorcio, + servico, + SUM(km_subsidiada_faixa) AS km_subsidiada_faixa, + SUM(valor_apurado) AS valor_apurado, + SUM(valor_acima_limite) AS valor_acima_limite, + SUM(valor_total_sem_glosa) AS valor_total_sem_glosa + FROM + -- rj-smtr-dev.financeiro.subsidio_faixa_servico_dia_tipo_viagem + {{ ref("subsidio_faixa_servico_dia_tipo_viagem") }} + WHERE + data BETWEEN DATE("{{ var("start_date") }}") + AND DATE("{{ var("end_date") }}") + GROUP BY + data, + tipo_dia, + faixa_horaria_inicio, + faixa_horaria_fim, + consorcio, + servico + ), + pivot_data AS ( + SELECT + * + FROM ( + SELECT + data, + tipo_dia, + faixa_horaria_inicio, + faixa_horaria_fim, + consorcio, + servico, + tipo_viagem, + km_apurada_faixa + FROM + {{ ref("subsidio_faixa_servico_dia_tipo_viagem") }} + -- rj-smtr-dev.financeiro.subsidio_faixa_servico_dia_tipo_viagem + WHERE + data BETWEEN DATE("{{ var("start_date") }}") + AND DATE("{{ var("end_date") }}") + ) + PIVOT(SUM(km_apurada_faixa) AS km_apurada FOR tipo_viagem IN ( + "Registrado com ar inoperante" AS registrado_com_ar_inoperante, + "Não licenciado" AS n_licenciado, + "Autuado por ar inoperante" AS autuado_ar_inoperante, + "Autuado por segurança" AS autuado_seguranca, + "Autuado por limpeza/equipamento" AS autuado_limpezaequipamento, + "Licenciado sem ar e não autuado" AS licenciado_sem_ar_n_autuado, + "Licenciado com ar e não autuado" AS licenciado_com_ar_n_autuado, + "Não vistoriado" AS n_vistoriado, + "Sem transação" AS sem_transacao)) + ) +SELECT + s.data, + s.tipo_dia, + s.faixa_horaria_inicio, + s.faixa_horaria_fim, + s.consorcio, + s.servico, + s.viagens_faixa, + s.km_apurada_faixa, + agg.km_subsidiada_faixa, + s.km_planejada_faixa, + s.pof, + pd.* EXCEPT(data, tipo_dia, faixa_horaria_inicio, faixa_horaria_fim, servico, consorcio), + agg.valor_apurado, + agg.valor_acima_limite, + agg.valor_total_sem_glosa, + '{{ var("version") }}' as versao, + CURRENT_DATETIME("America/Sao_Paulo") as datetime_ultima_atualizacao +FROM + subsidio_faixa AS s +LEFT JOIN + subsidio_faixa_agg AS agg +USING(data, tipo_dia, faixa_horaria_inicio, faixa_horaria_fim, consorcio, servico) +LEFT JOIN + pivot_data AS pd +USING(data, tipo_dia, faixa_horaria_inicio, faixa_horaria_fim, consorcio, servico) \ No newline at end of file diff --git a/queries/models/dashboard_subsidio_sppo_v2/sumario_servico_dia_pagamento.sql b/queries/models/dashboard_subsidio_sppo_v2/sumario_servico_dia_pagamento.sql new file mode 100644 index 000000000..1e18f8a76 --- /dev/null +++ b/queries/models/dashboard_subsidio_sppo_v2/sumario_servico_dia_pagamento.sql @@ -0,0 +1,97 @@ +{{ + config( + materialized="incremental", + partition_by={"field": "data", "data_type": "date", "granularity": "day"}, + incremental_strategy="insert_overwrite", + ) +}} + +WITH + subsidio_dia AS ( + SELECT + data, + tipo_dia, + consorcio, + servico, + SAFE_CAST(AVG(pof) AS NUMERIC) AS media_pof, + SAFE_CAST(STDDEV(pof) AS NUMERIC) AS desvp_pof + FROM + {{ ref("subsidio_faixa_servico_dia") }} + -- rj-smtr-dev.financeiro.subsidio_faixa_servico_dia + WHERE + data BETWEEN DATE("{{ var("start_date") }}") + AND DATE("{{ var("end_date") }}") + GROUP BY + data, + tipo_dia, + consorcio, + servico + ), + valores_subsidio AS ( + SELECT + * + FROM + {{ ref("subsidio_sumario_servico_dia_pagamento") }} + -- rj-smtr-dev.financeiro.subsidio_sumario_servico_dia_pagamento + WHERE + data BETWEEN DATE("{{ var("start_date") }}") + AND DATE("{{ var("end_date") }}") + ), + pivot_data AS ( + SELECT + * + FROM ( + SELECT + data, + tipo_dia, + consorcio, + servico, + tipo_viagem, + km_apurada_faixa + FROM + {{ ref("subsidio_faixa_servico_dia_tipo_viagem") }} + -- rj-smtr-dev.financeiro.subsidio_faixa_servico_dia_tipo_viagem + WHERE + data BETWEEN DATE("{{ var("start_date") }}") + AND DATE("{{ var("end_date") }}") + ) + PIVOT(SUM(km_apurada_faixa) AS km_apurada FOR tipo_viagem IN ( + "Registrado com ar inoperante" AS registrado_com_ar_inoperante, + "Não licenciado" AS n_licenciado, + "Autuado por ar inoperante" AS autuado_ar_inoperante, + "Autuado por segurança" AS autuado_seguranca, + "Autuado por limpeza/equipamento" AS autuado_limpezaequipamento, + "Licenciado sem ar e não autuado" AS licenciado_sem_ar_n_autuado, + "Licenciado com ar e não autuado" AS licenciado_com_ar_n_autuado, + "Não vistoriado" AS n_vistoriado, + "Sem transação" AS sem_transacao)) + ) +SELECT + vs.data, + vs.tipo_dia, + vs.consorcio, + vs.servico, + vs.viagens_dia, + vs.km_apurada_dia, + vs.km_subsidiada_dia, + vs.km_planejada_dia, + sd.media_pof, + sd.desvp_pof, + pd.* EXCEPT(data, tipo_dia, servico, consorcio), + vs.valor_a_pagar, + vs.valor_glosado, + vs.valor_acima_limite, + vs.valor_total_sem_glosa, + vs.valor_total_apurado, + vs.valor_judicial, + vs.valor_penalidade, + '{{ var("version") }}' as versao, + CURRENT_DATETIME("America/Sao_Paulo") as datetime_ultima_atualizacao +FROM + valores_subsidio AS vs +LEFT JOIN + subsidio_dia AS sd +USING(data, tipo_dia, consorcio, servico) +LEFT JOIN + pivot_data AS pd +USING(data, tipo_dia, consorcio, servico) \ No newline at end of file diff --git a/queries/models/docs.md b/queries/models/docs.md index 65fb6c56d..031c6ade9 100644 --- a/queries/models/docs.md +++ b/queries/models/docs.md @@ -144,6 +144,26 @@ Tipo da gratuidade (Estudante, PCD, Sênior) Tipo de pagamento utilizado {% enddocs %} +{% docs tipo_dia %} +Dia da semana - categorias: Dia Útil, Sábado, Domingo +{% enddocs %} + +{% docs faixa_horaria_inicio %} +Horário inicial da faixa horária +{% enddocs %} + +{% docs faixa_horaria_fim %} +Horário final da faixa horária +{% enddocs %} + +{% docs partidas %} +Quantidade de partidas planejadas +{% enddocs %} + +{% docs quilometragem %} +Quilometragem planejada +{% enddocs %} + {% docs timestamp_captura %} Timestamp de captura pela SMTR {% enddocs %} diff --git a/queries/models/financeiro/CHANGELOG.md b/queries/models/financeiro/CHANGELOG.md new file mode 100644 index 000000000..f572bbe94 --- /dev/null +++ b/queries/models/financeiro/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog - financeiro + +## [1.0.0] - 2024-08-29 + +### Adicionado + +- Cria modelos `subsidio_penalidade_servico_dia`, `subsidio_faixa_servico_dia_tipo_viagem` e `subsidio_sumario_servico_dia_pagamento` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114) \ No newline at end of file diff --git a/queries/models/financeiro/schema.yaml b/queries/models/financeiro/schema.yaml new file mode 100644 index 000000000..7319e005a --- /dev/null +++ b/queries/models/financeiro/schema.yaml @@ -0,0 +1,75 @@ +version: 2 + +models: + - name: subsidio_faixa_servico_dia_tipo_viagem + description: "Sumário do subsídio dos serviços de ônibus (SPPO) por dia, faixa horária e tipo de viagem." + columns: + - name: data + description: "Data de emissão do sinal de GPS." + - name: tipo_dia + description: "Dia da semana considerado para o cálculo da distância planejada - categorias: Dia Útil, Sabado, Domingo." + - name: faixa_horaria_inicio + description: "Horário inicial da faixa horária" + - name: faixa_horaria_fim + description: "Horário final da faixa horária" + - name: consorcio + description: "Consórcio que opera o serviço." + - name: servico + description: "Serviço realizado pelo veículo." + - name: indicador_ar_condicionado + description: "Indicador se o veículo foi licenciado com ar condicionado." + - name: tipo_viagem + description: "Tipo de viagem" + - name: viagens_faixa + description: "Quantidade de viagens apuradas por faixa horária." + - name: km_apurada_faixa + description: "Distância apurada para o serviço por faixa horária(km)." + - name: km_subsidiada_faixa + description: "Distância subsidiada para o serviço por faixa horária (km)" + - name: valor_apurado + description: "Valor da distância apurada multiplicada pelo subsídio por quilômetro (sem glosa). É zerado quando POF < 80%." + - name: valor_acima_limite + description: "Valor apurado das viagens que não foram remuneradas (por estar acima do teto de 120% / 200%)." + - name: valor_total_sem_glosa + description: "Valor total das viagens considerando o valor máximo por km." + - name: versao + description: "{{ doc('versao') }}" + - name: datetime_ultima_atualizacao + description: "{{ doc('datetime_ultima_atualizacao') }}" + - name: subsidio_sumario_servico_dia_pagamento + description: "Sumário do subsídio dos serviços de ônibus (SPPO) por dia" + columns: + - name: data + description: "Data de emissão do sinal de GPS." + - name: tipo_dia + description: "Dia da semana considerado para o cálculo da distância planejada - categorias: Dia Útil, Sabado, Domingo." + - name: consorcio + description: "Consórcio que opera o serviço." + - name: servico + description: "Serviço realizado pelo veículo." + - name: viagens_dia + description: "Quantidade de viagens apuradas por dia." + - name: km_apurada_dia + description: "Distância apurada para o serviço por dia (km)." + - name: km_subsidiada_dia + description: "Distância subsidiada para o serviço por dia (km)." + - name: km_planejada_dia + description: "Distância planejada para o serviço por dia (km)." + - name: valor_a_pagar + description: "Valor efetivo de pagamento (valor_total_apurado - valor_acima_limite - valor_glosado)." + - name: valor_glosado + description: "Valor total das viagens considerando o valor máximo por km, subtraído pelo valor efetivo por km." + - name: valor_acima_limite + description: "Valor apurado das viagens que não foram remuneradas (por estar acima do teto de 120% / 200%)." + - name: valor_total_sem_glosa + description: "Valor total das viagens considerando o valor máximo por km." + - name: valor_total_apurado + description: "Valor total das viagens apuradas, subtraídas as penalidades (POF =< 60%)." + - name: valor_judicial + description: "Valor de glosa depositada em juízo (Autuação por ar inoperante, Veículo licenciado sem ar, Penalidade abaixo de 60% e Notificação dos Agentes de Verão)." + - name: valor_penalidade + description: "Valor penalidade [negativa] (POF =< 60%)." + - name: versao + description: "{{ doc('versao') }}" + - name: datetime_ultima_atualizacao + description: "{{ doc('datetime_ultima_atualizacao') }}" \ No newline at end of file diff --git a/queries/models/financeiro/staging/CHANGELOG.md b/queries/models/financeiro/staging/CHANGELOG.md new file mode 100644 index 000000000..e98f01fba --- /dev/null +++ b/queries/models/financeiro/staging/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog - financeiro_staging + +## [1.0.0] - 2024-09-02 + +### Adicionado + +- Cria modelo `subsidio_faixa_servico_dia` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114) \ No newline at end of file diff --git a/queries/models/financeiro/staging/schema.yaml b/queries/models/financeiro/staging/schema.yaml new file mode 100644 index 000000000..da0e92d41 --- /dev/null +++ b/queries/models/financeiro/staging/schema.yaml @@ -0,0 +1,30 @@ +version: 2 + +models: + - name: subsidio_faixa_servico_dia + description: "Sumário do subsídio dos serviços de ônibus (SPPO) por dia e faixa horária." + columns: + - name: data + description: "Data de emissão do sinal de GPS." + - name: tipo_dia + description: "Dia da semana considerado para o cálculo da distância planejada - categorias: Dia Útil, Sabado, Domingo." + - name: faixa_horaria_inicio + description: "Horário inicial da faixa horária" + - name: faixa_horaria_fim + description: "Horário final da faixa horária" + - name: consorcio + description: "Consórcio que opera o serviço." + - name: servico + description: "Serviço realizado pelo veículo." + - name: viagens_faixa + description: "Quantidade de viagens apuradas por faixa horária." + - name: km_apurada_faixa + description: "Distância apurada para o serviço por faixa horária(km)." + - name: km_planejada_faixa + description: "Distância planejada para o serviço por faixa horária (km)." + - name: pof + description: "Indicador percentual de quilometragem apurada em relação à planejada da linha por faixa horária." + - name: versao + description: "{{ doc('versao') }}" + - name: datetime_ultima_atualizacao + description: "{{ doc('datetime_ultima_atualizacao') }}" \ No newline at end of file diff --git a/queries/models/financeiro/staging/subsidio_faixa_servico_dia.sql b/queries/models/financeiro/staging/subsidio_faixa_servico_dia.sql new file mode 100644 index 000000000..5d1123213 --- /dev/null +++ b/queries/models/financeiro/staging/subsidio_faixa_servico_dia.sql @@ -0,0 +1,82 @@ +{{ + config( + materialized="incremental", + partition_by={"field": "data", "data_type": "date", "granularity": "day"}, + incremental_strategy="insert_overwrite", + ) +}} + +WITH +-- 1. Viagens planejadas + planejado AS ( + SELECT + DISTINCT data, + tipo_dia, + consorcio, + servico, + faixa_horaria_inicio, + faixa_horaria_fim, + distancia_total_planejada AS km_planejada + FROM + {{ ref("viagem_planejada") }} + -- rj-smtr.projeto_subsidio_sppo.viagem_planejada + WHERE + data BETWEEN DATE("{{ var("start_date") }}") + AND DATE("{{ var("end_date") }}") + AND distancia_total_planejada > 0 + ), +-- 2. Viagens realizadas + viagem AS ( + SELECT + data, + servico_realizado AS servico, + id_viagem, + datetime_partida, + distancia_planejada + FROM + {{ ref("viagem_completa") }} + -- rj-smtr.projeto_subsidio_sppo.viagem_completa + WHERE + data BETWEEN DATE("{{ var("start_date") }}") + AND DATE("{{ var("end_date") }}") + ), +-- 3. Apuração de km realizado e Percentual de Operação por faixa + servico_km_apuracao AS ( + SELECT + p.data, + p.tipo_dia, + p.faixa_horaria_inicio, + p.faixa_horaria_fim, + p.consorcio, + p.servico, + SAFE_CAST(p.km_planejada AS NUMERIC) AS km_planejada_faixa, + SAFE_CAST(COALESCE(COUNT(v.id_viagem), 0) AS INT64) AS viagens_faixa, + SAFE_CAST(COALESCE(SUM(v.distancia_planejada), 0) AS NUMERIC) AS km_apurada_faixa, + SAFE_CAST(COALESCE(ROUND(100 * SUM(v.distancia_planejada) / p.km_planejada, 2), 0) AS NUMERIC) AS pof + FROM + planejado AS p + LEFT JOIN + viagem AS v + ON + p.data = v.data + AND p.servico = v.servico + AND v.datetime_partida BETWEEN p.faixa_horaria_inicio + AND p.faixa_horaria_fim + GROUP BY + p.data, p.tipo_dia, p.faixa_horaria_inicio, p.faixa_horaria_fim, p.consorcio, p.servico, p.km_planejada +) +SELECT + data, + tipo_dia, + faixa_horaria_inicio, + faixa_horaria_fim, + consorcio, + servico, + viagens_faixa, + km_apurada_faixa, + km_planejada_faixa, + pof, + '{{ var("version") }}' as versao, + CURRENT_DATETIME("America/Sao_Paulo") as datetime_ultima_atualizacao +FROM + servico_km_apuracao \ No newline at end of file diff --git a/queries/models/financeiro/subsidio_faixa_servico_dia_tipo_viagem.sql b/queries/models/financeiro/subsidio_faixa_servico_dia_tipo_viagem.sql new file mode 100644 index 000000000..7ca146bad --- /dev/null +++ b/queries/models/financeiro/subsidio_faixa_servico_dia_tipo_viagem.sql @@ -0,0 +1,159 @@ +{{ + config( + materialized="incremental", + partition_by={"field": "data", "data_type": "date", "granularity": "day"}, + incremental_strategy="insert_overwrite", + ) +}} + +WITH + subsidio_faixa_dia AS ( + SELECT + data, + tipo_dia, + faixa_horaria_inicio, + faixa_horaria_fim, + consorcio, + servico, + pof + FROM + {{ ref("subsidio_faixa_servico_dia") }} + -- rj-smtr.financeiro.subsidio_faixa_servico_dia + WHERE + data BETWEEN DATE("{{ var("start_date") }}") + AND DATE("{{ var("end_date") }}") + ), + servico_km_apuracao AS ( + SELECT + data, + servico, + CASE + WHEN tipo_viagem = "Nao licenciado" THEN "Não licenciado" + WHEN tipo_viagem = "Licenciado com ar e autuado (023.II)" THEN "Autuado por ar inoperante" + WHEN tipo_viagem = "Licenciado sem ar" THEN "Licenciado sem ar e não autuado" + WHEN tipo_viagem = "Licenciado com ar e não autuado (023.II)" THEN "Licenciado com ar e não autuado" + ELSE tipo_viagem + END AS tipo_viagem, + id_viagem, + distancia_planejada, + subsidio_km, + subsidio_km_teto, + indicador_penalidade_judicial, + indicador_viagem_dentro_limite + FROM + {{ ref("viagens_remuneradas") }} + -- rj-smtr.dashboard_subsidio_sppo.viagens_remuneradas + WHERE + data BETWEEN DATE("{{ var("start_date") }}") + AND DATE("{{ var("end_date") }}") + ), + indicador_ar AS ( + SELECT + data, + id_veiculo, + status, + SAFE_CAST(JSON_VALUE(indicadores,"$.indicador_ar_condicionado") AS BOOL) AS indicador_ar_condicionado + FROM + {{ ref("sppo_veiculo_dia") }} + -- rj-smtr.veiculo.sppo_veiculo_dia + WHERE + data BETWEEN DATE("{{ var("start_date") }}") + AND DATE("{{ var("end_date") }}") + ), + viagem AS ( + SELECT + data, + servico_realizado AS servico, + id_veiculo, + id_viagem, + datetime_partida + FROM + {{ ref("viagem_completa") }} + -- rj-smtr.projeto_subsidio_sppo.viagem_completa + WHERE + data BETWEEN DATE("{{ var("start_date") }}") + AND DATE("{{ var("end_date") }}") + ), + ar_viagem AS ( + SELECT + v.data, + v.servico, + v.id_viagem, + v.datetime_partida, + COALESCE(ia.indicador_ar_condicionado, FALSE) AS indicador_ar_condicionado + FROM + viagem v + LEFT JOIN + indicador_ar ia + ON + ia.data = v.data + AND ia.id_veiculo = v.id_veiculo + ), + subsidio_servico_ar AS ( + SELECT + sfd.data, + sfd.tipo_dia, + sfd.faixa_horaria_inicio, + sfd.faixa_horaria_fim, + sfd.consorcio, + sfd.servico, + sfd.pof, + COALESCE(s.tipo_viagem, "Sem viagem apurada") AS tipo_viagem, + s.id_viagem, + s.distancia_planejada, + s.subsidio_km, + s.subsidio_km_teto, + s.indicador_viagem_dentro_limite, + CASE + WHEN sfd.pof < 60 THEN TRUE + ELSE s.indicador_penalidade_judicial + END AS indicador_penalidade_judicial, + COALESCE(av.indicador_ar_condicionado, FALSE) AS indicador_ar_condicionado + FROM + subsidio_faixa_dia AS sfd + LEFT JOIN + ar_viagem AS av + ON + sfd.data = av.data + AND sfd.servico = av.servico + AND av.datetime_partida BETWEEN sfd.faixa_horaria_inicio + AND sfd.faixa_horaria_fim + LEFT JOIN + servico_km_apuracao AS s + ON + sfd.data = s.data + AND sfd.servico = s.servico + AND s.id_viagem = av.id_viagem + ) +SELECT + data, + tipo_dia, + faixa_horaria_inicio, + faixa_horaria_fim, + consorcio, + servico, + indicador_ar_condicionado, + indicador_penalidade_judicial, + indicador_viagem_dentro_limite, + tipo_viagem, + SAFE_CAST(COALESCE(COUNT(id_viagem), 0) AS INT64) AS viagens_faixa, + SAFE_CAST(TRUNC(COALESCE(SUM(distancia_planejada), 0), 3)AS NUMERIC) AS km_apurada_faixa, + SAFE_CAST(TRUNC(COALESCE(SUM(IF(tipo_viagem != "Não licenciado", distancia_planejada, 0)), 0), 3) AS NUMERIC) AS km_subsidiada_faixa, + SAFE_CAST(TRUNC(SUM(IF(indicador_viagem_dentro_limite = TRUE AND pof >= 80, distancia_planejada*subsidio_km, 0)), 2) AS NUMERIC) AS valor_apurado, + SAFE_CAST(-TRUNC(COALESCE(SUM(IF(indicador_viagem_dentro_limite = TRUE, 0, distancia_planejada*subsidio_km)), 0), 2) AS NUMERIC) AS valor_acima_limite, + SAFE_CAST(TRUNC(SUM(IF(pof >= 80 AND tipo_viagem != "Não licenciado", distancia_planejada*subsidio_km_teto, 0)) - COALESCE(SUM(IF(indicador_viagem_dentro_limite = TRUE, 0, distancia_planejada*subsidio_km)), 0), 2) AS NUMERIC) AS valor_total_sem_glosa, + '{{ var("version") }}' as versao, + CURRENT_DATETIME("America/Sao_Paulo") as datetime_ultima_atualizacao +FROM + subsidio_servico_ar +GROUP BY + data, + tipo_dia, + faixa_horaria_inicio, + faixa_horaria_fim, + consorcio, + servico, + indicador_ar_condicionado, + indicador_penalidade_judicial, + indicador_viagem_dentro_limite, + tipo_viagem diff --git a/queries/models/financeiro/subsidio_penalidade_servico_dia.sql b/queries/models/financeiro/subsidio_penalidade_servico_dia.sql new file mode 100644 index 000000000..24e638ae4 --- /dev/null +++ b/queries/models/financeiro/subsidio_penalidade_servico_dia.sql @@ -0,0 +1,56 @@ +{{ + config( + materialized="incremental", + partition_by={"field": "data", "data_type": "date", "granularity": "day"}, + incremental_strategy="insert_overwrite", + ) +}} + +WITH + subsidio_dia AS ( + SELECT + data, + tipo_dia, + consorcio, + servico, + MIN(pof) AS min_pof + FROM + {{ ref("subsidio_faixa_servico_dia") }} + -- rj-smtr-dev.financeiro.subsidio_faixa_servico_dia + WHERE + data BETWEEN DATE("{{ var("start_date") }}") + AND DATE("{{ var("end_date") }}") + GROUP BY + data, + tipo_dia, + consorcio, + servico + ), + penalidade AS ( + SELECT + data_inicio, + data_fim, + perc_km_inferior, + perc_km_superior, + IFNULL(-valor, 0) AS valor_penalidade + FROM + {{ ref("valor_tipo_penalidade") }} + -- rj-smtr.dashboard_subsidio_sppo.valor_tipo_penalidade + ) +SELECT + s.data, + s.tipo_dia, + s.consorcio, + s.servico, + SAFE_CAST(COALESCE(pe.valor_penalidade, 0) AS NUMERIC) AS valor_penalidade, + '{{ var("version") }}' as versao, + CURRENT_DATETIME("America/Sao_Paulo") as datetime_ultima_atualizacao +FROM + subsidio_dia AS s +LEFT JOIN + penalidade AS pe +ON + s.data BETWEEN pe.data_inicio + AND pe.data_fim + AND s.min_pof >= pe.perc_km_inferior + AND s.min_pof < pe.perc_km_superior \ No newline at end of file diff --git a/queries/models/financeiro/subsidio_sumario_servico_dia_pagamento.sql b/queries/models/financeiro/subsidio_sumario_servico_dia_pagamento.sql new file mode 100644 index 000000000..6d0b4c0a1 --- /dev/null +++ b/queries/models/financeiro/subsidio_sumario_servico_dia_pagamento.sql @@ -0,0 +1,118 @@ +{{ + config( + materialized="incremental", + partition_by={"field": "data", "data_type": "date", "granularity": "day"}, + incremental_strategy="insert_overwrite", + ) +}} + +WITH + subsidio_dia AS ( + SELECT + data, + tipo_dia, + consorcio, + servico, + SUM(viagens_faixa) AS viagens_dia, + SUM(km_apurada_faixa) AS km_apurada_dia, + SUM(km_planejada_faixa) AS km_planejada_dia + FROM + {{ ref("subsidio_faixa_servico_dia") }} + -- rj-smtr.financeiro.subsidio_faixa_servico_dia + WHERE + data BETWEEN DATE("{{ var("start_date") }}") + AND DATE("{{ var("end_date") }}") + GROUP BY + data, + tipo_dia, + consorcio, + servico + ), + subsidio_parametros AS ( + SELECT + DISTINCT data_inicio, + data_fim, + status, + subsidio_km, + MAX(subsidio_km) OVER (PARTITION BY data_inicio, data_fim) AS subsidio_km_teto + FROM + {{ ref("subsidio_valor_km_tipo_viagem") }} + -- rj-smtr-staging.dashboard_subsidio_sppo_staging.subsidio_valor_km_tipo_viagem +), + penalidade AS ( + SELECT + data, + tipo_dia, + consorcio, + servico, + valor_penalidade + FROM + {{ ref("subsidio_penalidade_servico_dia") }} + ), + subsidio_dia_tipo_viagem AS ( + SELECT + * + FROM + {{ ref("subsidio_faixa_servico_dia_tipo_viagem") }} + -- rj-smtr.financeiro.subsidio_faixa_servico_dia_tipo_viagem + WHERE + data BETWEEN DATE("{{ var("start_date") }}") + AND DATE("{{ var("end_date") }}") + ), + valores_calculados AS ( + SELECT + s.data, + s.tipo_dia, + s.consorcio, + s.servico, + pe.valor_penalidade, + SUM(s.km_subsidiada_faixa) AS km_subsidiada_dia, + COALESCE(SUM(s.valor_acima_limite), 0) AS valor_acima_limite, + COALESCE(SUM(s.valor_total_sem_glosa), 0) AS valor_total_sem_glosa, + SUM(s.valor_apurado) + pe.valor_penalidade AS valor_total_com_glosa, + CASE + WHEN pe.valor_penalidade != 0 THEN -pe.valor_penalidade + ELSE SAFE_CAST(TRUNC((SUM(IF(indicador_viagem_dentro_limite = TRUE AND indicador_penalidade_judicial = TRUE, km_apurada_faixa*subsidio_km_teto, 0)) + - SUM(IF(indicador_viagem_dentro_limite = TRUE AND indicador_penalidade_judicial = TRUE, km_apurada_faixa*subsidio_km, 0))), 2) AS NUMERIC) + END AS valor_judicial, + FROM + subsidio_dia_tipo_viagem AS s + LEFT JOIN + penalidade AS pe + USING(data, tipo_dia, consorcio, servico) + LEFT JOIN + subsidio_parametros AS sp + ON + s.data BETWEEN sp.data_inicio + AND sp.data_fim + AND s.tipo_viagem = sp.status + GROUP BY + s.data, + s.tipo_dia, + s.consorcio, + s.servico, + pe.valor_penalidade + ) +SELECT + sd.data, + sd.tipo_dia, + sd.consorcio, + sd.servico, + sd.viagens_dia, + sd.km_apurada_dia, + vc.km_subsidiada_dia, + sd.km_planejada_dia, + vc.valor_total_com_glosa AS valor_a_pagar, + vc.valor_total_com_glosa - vc.valor_total_sem_glosa AS valor_glosado, + vc.valor_acima_limite, + vc.valor_total_sem_glosa, + vc.valor_acima_limite + vc.valor_penalidade + vc.valor_total_sem_glosa AS valor_total_apurado, + vc.valor_judicial, + vc.valor_penalidade, + '{{ var("version") }}' as versao, + CURRENT_DATETIME("America/Sao_Paulo") as datetime_ultima_atualizacao +FROM + subsidio_dia AS sd +LEFT JOIN + valores_calculados AS vc +USING(data, tipo_dia, consorcio, servico) \ No newline at end of file diff --git a/queries/models/gtfs/CHANGELOG.md b/queries/models/gtfs/CHANGELOG.md index b428b17e4..7f32babb4 100644 --- a/queries/models/gtfs/CHANGELOG.md +++ b/queries/models/gtfs/CHANGELOG.md @@ -2,6 +2,10 @@ ## [1.1.9] - 2024-09-10 +### Alterado + +- Altera modelo `ordem_servico_trips_shapes_gtfs` em razão da apuração por faixa horária (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114) + ### Corrigido - Corrigido `schema.yml` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/202) diff --git a/queries/models/gtfs/ordem_servico_trips_shapes_gtfs.sql b/queries/models/gtfs/ordem_servico_trips_shapes_gtfs.sql index 73e166c72..22cfcf0df 100644 --- a/queries/models/gtfs/ordem_servico_trips_shapes_gtfs.sql +++ b/queries/models/gtfs/ordem_servico_trips_shapes_gtfs.sql @@ -171,10 +171,25 @@ SELECT vista, consorcio, sentido, + CASE + WHEN feed_start_date >= var("DATA_SUBSIDIO_V9_INICIO") THEN fh.partidas + ELSE NULL + END AS partidas_total_planejada, distancia_planejada, - distancia_total_planejada, + CASE + WHEN feed_start_date >= var("DATA_SUBSIDIO_V9_INICIO") THEN fh.quilometragem + ELSE distancia_total_planejada + END AS distancia_total_planejada, inicio_periodo, fim_periodo, + CASE + WHEN feed_start_date >= var("DATA_SUBSIDIO_V9_INICIO") THEN fh.faixa_horaria_inicio + ELSE "00:00:00" + END AS faixa_horaria_inicio, + CASE + WHEN feed_start_date >= var("DATA_SUBSIDIO_V9_INICIO") THEN fh.faixa_horaria_fim + ELSE "23:59:59" + END AS faixa_horaria_fim, trip_id_planejado, trip_id, shape_id, @@ -197,6 +212,11 @@ USING (feed_version, feed_start_date, shape_id) +LEFT JOIN + {{ ref("ordem_servico_faixa_horaria") }} AS fh + -- rj-smtr-dev.gtfs.ordem_servico_faixa_horaria AS fh +USING + (feed_version, feed_start_date, tipo_os, tipo_dia, servico) {% if is_incremental() -%} WHERE feed_start_date = '{{ var("data_versao_gtfs") }}' diff --git a/queries/models/planejamento/CHANGELOG.md b/queries/models/planejamento/CHANGELOG.md new file mode 100644 index 000000000..e8a1a3a9e --- /dev/null +++ b/queries/models/planejamento/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog - planejamento + +## [1.0.0] - 2024-08-29 + +### Adicionado + +- Cria modelo `ordem_servico_faixa_horaria` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114) \ No newline at end of file diff --git a/queries/models/planejamento/ordem_servico_faixa_horaria.sql b/queries/models/planejamento/ordem_servico_faixa_horaria.sql new file mode 100644 index 000000000..f52bb72be --- /dev/null +++ b/queries/models/planejamento/ordem_servico_faixa_horaria.sql @@ -0,0 +1,163 @@ +{{ + config( + partition_by = { + "field": "feed_start_date", + "data_type": "date", + "granularity": "day" + }, + ) +}} + +WITH + dados AS ( + SELECT + SAFE_CAST(data_versao AS DATE) AS data_versao, + SAFE_CAST(tipo_os AS DATE) AS tipo_os, + SAFE_CAST(servico AS STRING) AS servico, + SAFE_CAST(JSON_VALUE(content, "$.consorcio") AS STRING) AS consorcio, + SAFE_CAST(JSON_VALUE(content, '$.partidas_entre_00h_e_03h_dias_uteis') AS STRING) AS partidas_entre_00h_e_03h_dias_uteis, + SAFE_CAST(JSON_VALUE(content, '$.quilometragem_entre_00h_e_03h_dias_uteis') AS STRING) AS quilometragem_entre_00h_e_03h_dias_uteis, + SAFE_CAST(JSON_VALUE(content, '$.partidas_entre_03h_e_12h_dias_uteis') AS STRING) AS partidas_entre_03h_e_12h_dias_uteis, + SAFE_CAST(JSON_VALUE(content, '$.quilometragem_entre_03h_e_12h_dias_uteis') AS STRING) AS quilometragem_entre_03h_e_12h_dias_uteis, + SAFE_CAST(JSON_VALUE(content, '$.partidas_entre_12h_e_21h_dias_uteis') AS STRING) AS partidas_entre_12h_e_21h_dias_uteis, + SAFE_CAST(JSON_VALUE(content, '$.quilometragem_entre_12h_e_21h_dias_uteis') AS STRING) AS quilometragem_entre_12h_e_21h_dias_uteis, + SAFE_CAST(JSON_VALUE(content, '$.partidas_entre_21h_e_24h_dias_uteis') AS STRING) AS partidas_entre_21h_e_24h_dias_uteis, + SAFE_CAST(JSON_VALUE(content, '$.quilometragem_entre_21h_e_24h_dias_uteis') AS STRING) AS quilometragem_entre_21h_e_24h_dias_uteis, + SAFE_CAST(JSON_VALUE(content, '$.partidas_entre_24h_e_03h_diaseguinte_dias_uteis') AS STRING) AS partidas_entre_24h_e_03h_diaseguinte_dias_uteis, + SAFE_CAST(JSON_VALUE(content, '$.quilometragem_entre_24h_e_03h_diaseguinte_dias_uteis') AS STRING) AS quilometragem_entre_24h_e_03h_diaseguinte_dias_uteis, + SAFE_CAST(JSON_VALUE(content, '$.partidas_entre_03h_e_12h_sabado') AS STRING) AS partidas_entre_03h_e_12h_sabado, + SAFE_CAST(JSON_VALUE(content, '$.quilometragem_entre_03h_e_12h_sabado') AS STRING) AS quilometragem_entre_03h_e_12h_sabado, + SAFE_CAST(JSON_VALUE(content, '$.partidas_entre_12h_e_21h_sabado') AS STRING) AS partidas_entre_12h_e_21h_sabado, + SAFE_CAST(JSON_VALUE(content, '$.quilometragem_entre_12h_e_21h_sabado') AS STRING) AS quilometragem_entre_12h_e_21h_sabado, + SAFE_CAST(JSON_VALUE(content, '$.partidas_entre_21h_e_24h_sabado') AS STRING) AS partidas_entre_21h_e_24h_sabado, + SAFE_CAST(JSON_VALUE(content, '$.quilometragem_entre_21h_e_24h_sabado') AS STRING) AS quilometragem_entre_21h_e_24h_sabado, + SAFE_CAST(JSON_VALUE(content, '$.partidas_entre_24h_e_03h_diaseguinte_sabado') AS STRING) AS partidas_entre_24h_e_03h_diaseguinte_sabado, + SAFE_CAST(JSON_VALUE(content, '$.quilometragem_entre_24h_e_03h_diaseguinte_sabado') AS STRING) AS quilometragem_entre_24h_e_03h_diaseguinte_sabado, + SAFE_CAST(JSON_VALUE(content, '$.partidas_entre_03h_e_12h_domingo') AS STRING) AS partidas_entre_03h_e_12h_domingo, + SAFE_CAST(JSON_VALUE(content, '$.quilometragem_entre_03h_e_12h_domingo') AS STRING) AS quilometragem_entre_03h_e_12h_domingo, + SAFE_CAST(JSON_VALUE(content, '$.partidas_entre_12h_e_21h_domingo') AS STRING) AS partidas_entre_12h_e_21h_domingo, + SAFE_CAST(JSON_VALUE(content, '$.quilometragem_entre_12h_e_21h_domingo') AS STRING) AS quilometragem_entre_12h_e_21h_domingo, + SAFE_CAST(JSON_VALUE(content, '$.partidas_entre_21h_e_24h_domingo') AS STRING) AS partidas_entre_21h_e_24h_domingo, + SAFE_CAST(JSON_VALUE(content, '$.quilometragem_entre_21h_e_24h_domingo') AS STRING) AS quilometragem_entre_21h_e_24h_domingo, + SAFE_CAST(JSON_VALUE(content, '$.partidas_entre_24h_e_03h_diaseguinte_domingo') AS STRING) AS partidas_entre_24h_e_03h_diaseguinte_domingo, + SAFE_CAST(JSON_VALUE(content, '$.quilometragem_entre_24h_e_03h_diaseguinte_domingo') AS STRING) AS quilometragem_entre_24h_e_03h_diaseguinte_domingo, + SAFE_CAST(JSON_VALUE(content, '$.partidas_entre_03h_e_12h_ponto_facultativo') AS STRING) AS partidas_entre_03h_e_12h_ponto_facultativo, + SAFE_CAST(JSON_VALUE(content, '$.quilometragem_entre_03h_e_12h_ponto_facultativo') AS STRING) AS quilometragem_entre_03h_e_12h_ponto_facultativo, + SAFE_CAST(JSON_VALUE(content, '$.partidas_entre_12h_e_21h_ponto_facultativo') AS STRING) AS partidas_entre_12h_e_21h_ponto_facultativo, + SAFE_CAST(JSON_VALUE(content, '$.quilometragem_entre_12h_e_21h_ponto_facultativo') AS STRING) AS quilometragem_entre_12h_e_21h_ponto_facultativo, + SAFE_CAST(JSON_VALUE(content, '$.partidas_entre_21h_e_24h_ponto_facultativo') AS STRING) AS partidas_entre_21h_e_24h_ponto_facultativo, + SAFE_CAST(JSON_VALUE(content, '$.quilometragem_entre_21h_e_24h_ponto_facultativo') AS STRING) AS quilometragem_entre_21h_e_24h_ponto_facultativo, + SAFE_CAST(JSON_VALUE(content, '$.partidas_entre_24h_e_03h_diaseguinte_ponto_facultativo') AS STRING) AS partidas_entre_24h_e_03h_diaseguinte_ponto_facultativo, + SAFE_CAST(JSON_VALUE(content, '$.quilometragem_entre_24h_e_03h_diaseguinte_ponto_facultativo') AS STRING) AS quilometragem_entre_24h_e_03h_diaseguinte_ponto_facultativo + FROM + {{ source("br_rj_riodejaneiro_gtfs_staging", "ordem_servico_faixa_horaria") }} + {% if is_incremental() -%} + WHERE + data_versao = '{{ var("data_versao_gtfs") }}' + {%- endif %} + ), + dados_agrupados AS ( + SELECT + data_versao, + tipo_os, + servico, + consorcio, + CASE + WHEN column_name LIKE '%dias_uteis%' THEN 'Dia Útil' + WHEN column_name LIKE '%sabado%' THEN 'Sabado' + WHEN column_name LIKE '%domingo%' THEN 'Domingo' + WHEN column_name LIKE '%ponto_facultativo%' THEN 'Ponto Facultativo' + END AS tipo_dia, + CASE + WHEN column_name LIKE '%00h_e_03h%' THEN + '00:00:00' + WHEN column_name LIKE '%03h_e_12h%' THEN + '03:00:00' + WHEN column_name LIKE '%12h_e_21h%' THEN + '12:00:00' + WHEN column_name LIKE '%21h_e_24h%' THEN + '21:00:00' + WHEN column_name LIKE '%24h_e_03h_diaseguinte%' THEN + '24:00:00' + END AS faixa_horaria_inicio, + CASE + WHEN column_name LIKE '%00h_e_03h%' THEN + '02:59:59' + WHEN column_name LIKE '%03h_e_12h%' THEN + '11:59:59' + WHEN column_name LIKE '%12h_e_21h%' THEN + '20:59:59' + WHEN column_name LIKE '%21h_e_24h%' THEN + '23:59:59' + WHEN column_name LIKE '%24h_e_03h_diaseguinte%' THEN + '26:59:59' + END AS faixa_horaria_fim, + SUM(CASE + WHEN column_name LIKE '%partidas%' THEN SAFE_CAST(value AS INT64) + ELSE 0 + END) AS partidas, + SUM(CASE + WHEN column_name LIKE '%quilometragem%' THEN SAFE_CAST(value AS FLOAT64) + ELSE 0 + END) AS quilometragem + FROM dados + UNPIVOT ( + value FOR column_name IN ( + partidas_entre_00h_e_03h_dias_uteis, + quilometragem_entre_00h_e_03h_dias_uteis, + partidas_entre_03h_e_12h_dias_uteis, + quilometragem_entre_03h_e_12h_dias_uteis, + partidas_entre_12h_e_21h_dias_uteis, + quilometragem_entre_12h_e_21h_dias_uteis, + partidas_entre_21h_e_24h_dias_uteis, + quilometragem_entre_21h_e_24h_dias_uteis, + partidas_entre_24h_e_03h_diaseguinte_dias_uteis, + quilometragem_entre_24h_e_03h_diaseguinte_dias_uteis, + partidas_entre_03h_e_12h_sabado, + quilometragem_entre_03h_e_12h_sabado, + partidas_entre_12h_e_21h_sabado, + quilometragem_entre_12h_e_21h_sabado, + partidas_entre_21h_e_24h_sabado, + quilometragem_entre_21h_e_24h_sabado, + partidas_entre_24h_e_03h_diaseguinte_sabado, + quilometragem_entre_24h_e_03h_diaseguinte_sabado, + partidas_entre_03h_e_12h_domingo, + quilometragem_entre_03h_e_12h_domingo, + partidas_entre_12h_e_21h_domingo, + quilometragem_entre_12h_e_21h_domingo, + partidas_entre_21h_e_24h_domingo, + quilometragem_entre_21h_e_24h_domingo, + partidas_entre_24h_e_03h_diaseguinte_domingo, + quilometragem_entre_24h_e_03h_diaseguinte_domingo, + partidas_entre_03h_e_12h_ponto_facultativo, + quilometragem_entre_03h_e_12h_ponto_facultativo, + partidas_entre_12h_e_21h_ponto_facultativo, + quilometragem_entre_12h_e_21h_ponto_facultativo, + partidas_entre_21h_e_24h_ponto_facultativo, + quilometragem_entre_21h_e_24h_ponto_facultativo, + partidas_entre_24h_e_03h_diaseguinte_ponto_facultativo, + quilometragem_entre_24h_e_03h_diaseguinte_ponto_facultativo + ) + ) + GROUP BY 1, 2, 3, 4, 5, 6 +) +SELECT + fi.feed_version, + fi.feed_start_date, + fi.feed_end_date, + d.* EXCEPT(data_versao), + '{{ var("version") }}' AS versao_modelo +FROM + dados_agrupados AS d +LEFT JOIN + {{ ref('feed_info_gtfs') }} AS fi +ON + d.data_versao = fi.feed_start_date +{% if is_incremental() -%} +WHERE + d.data_versao = '{{ var("data_versao_gtfs") }}' + AND fi.feed_start_date = '{{ var("data_versao_gtfs") }}' +{% else %} +WHERE + d.data_versao >= var("DATA_SUBSIDIO_V9_INICIO") +{%- endif %} diff --git a/queries/models/planejamento/schema.yml b/queries/models/planejamento/schema.yml new file mode 100644 index 000000000..a352dc462 --- /dev/null +++ b/queries/models/planejamento/schema.yml @@ -0,0 +1,18 @@ +version: 2 + +models: + - name: ordem_servico_faixa_horaria + description: "Quantidade de viagens planejadas por faixa horária." + columns: + - name: servico + description: "{{ doc('servico') }}" + - name: tipo_dia + description: "{{ doc('tipo_dia') }}" + - name: faixa_horaria_inicio + description: "{{ doc('faixa_horaria_inicio') }}" + - name: faixa_horaria_fim + description: "{{ doc('faixa_horaria_fim') }}" + - name: partidas + description: "{{ doc('partidas') }}" + - name: quilometragem + description: "{{ doc('quilometragem') }}" \ No newline at end of file diff --git a/queries/models/projeto_subsidio_sppo/CHANGELOG.md b/queries/models/projeto_subsidio_sppo/CHANGELOG.md index b7bf3079d..cf9aacf87 100644 --- a/queries/models/projeto_subsidio_sppo/CHANGELOG.md +++ b/queries/models/projeto_subsidio_sppo/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog - projeto_subsidio_sppo +## [6.0.5] - 2024-08-29 + +### Alterado + +- Alterado os modelos `viagem_planejada` e `aux_registros_status_trajeto` em razão da apuração por faixa horária (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114) + +- Alterado modelo `subsidio_data_versao_efetiva` para materializar apenas 1 dia (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114) + ## [6.0.4] 2024-08-13 ### Adicionado diff --git a/queries/models/projeto_subsidio_sppo/aux_registros_status_trajeto.sql b/queries/models/projeto_subsidio_sppo/aux_registros_status_trajeto.sql index 97e059294..70f5bcd70 100644 --- a/queries/models/projeto_subsidio_sppo/aux_registros_status_trajeto.sql +++ b/queries/models/projeto_subsidio_sppo/aux_registros_status_trajeto.sql @@ -5,6 +5,8 @@ {% set gps_interval = 3 %} {% endif %} +{% if var("run_date") < var("DATA_SUBSIDIO_V9_INICIO") %} + -- 1. Seleciona sinais de GPS registrados no período with gps as ( select @@ -102,3 +104,106 @@ select '{{ var("version") }}' as versao_modelo from status_viagem + +{% else %} + +-- 1. Seleciona sinais de GPS registrados no período +with gps as ( + select + g.* except(longitude, latitude, servico), + servico, + substr(id_veiculo, 2, 3) as id_empresa, + ST_GEOGPOINT(longitude, latitude) posicao_veiculo_geo, + date_sub(date("{{ var("run_date") }}"), interval 1 day) as data_operacao + from + `rj-smtr.br_rj_riodejaneiro_veiculos.gps_sppo` g -- {{ ref('gps_sppo') }} g + where ( + data between date_sub(date("{{ var("run_date") }}"), interval 1 day) and date("{{ var("run_date") }}") + ) + -- Limita range de busca do gps de D-2 às 00h até D-1 às 3h + and ( + timestamp_gps between datetime_sub(datetime_trunc("{{ var("run_date") }}", day), interval 1 day) + and datetime_add(datetime_trunc("{{ var("run_date") }}", day), interval {{ gps_interval }} hour) + ) + and status != "Parado garagem" +), +-- 2. Busca os shapes em formato geográfico +shapes AS ( + SELECT + * + FROM + {{ ref("shapes_geom_gtfs") }} + WHERE + date_sub(date("{{ var("run_date") }}"), interval 1 day) BETWEEN feed_start_date AND feed_end_date +), +-- 3. Deduplica viagens planejadas +viagem_planejada AS ( + SELECT + DISTINCT * EXCEPT(faixa_horaria_inicio, faixa_horaria_fim, partidas_total_planejada, distancia_total_planejada, shape, start_pt, end_pt) + FROM + {{ ref("viagem_planejada") }} + WHERE + data = date_sub(date("{{ var("run_date") }}"), interval 1 day) +), +deduplica_viagem_planejada AS ( + SELECT + v.*, + s.shape, + s.start_pt, + s.end_pt + FROM + viagem_planejada AS v + LEFT JOIN + shapes AS s + USING + (feed_version, feed_start_date, shape_id) +), +-- 4. Classifica a posição do veículo em todos os shapes possíveis de +-- serviços de uma mesma empresa +status_viagem as ( + select + data_operacao as data, + g.id_veiculo, + g.id_empresa, + g.timestamp_gps, + timestamp_trunc(g.timestamp_gps, minute) as timestamp_minuto_gps, + g.posicao_veiculo_geo, + TRIM(g.servico, " ") as servico_informado, + s.servico as servico_realizado, + s.shape_id, + s.sentido_shape, + s.shape_id_planejado, + s.trip_id, + s.trip_id_planejado, + s.sentido, + s.start_pt, + s.end_pt, + s.distancia_planejada, + ifnull(g.distancia,0) as distancia, + case + when ST_DWITHIN(g.posicao_veiculo_geo, start_pt, {{ var("buffer") }}) + then 'start' + when ST_DWITHIN(g.posicao_veiculo_geo, end_pt, {{ var("buffer") }}) + then 'end' + when ST_DWITHIN(g.posicao_veiculo_geo, shape, {{ var("buffer") }}) + then 'middle' + else 'out' + end status_viagem + from + gps g + inner join ( + SELECT + * + FROM + deduplica_viagem_planejada + ) s + on + g.data_operacao = s.data + and g.servico = s.servico +) +select + *, + '{{ var("version") }}' as versao_modelo +from + status_viagem +{% endif %} \ No newline at end of file diff --git a/queries/models/projeto_subsidio_sppo/schema.yml b/queries/models/projeto_subsidio_sppo/schema.yml index a0be43fc8..c852d5723 100644 --- a/queries/models/projeto_subsidio_sppo/schema.yml +++ b/queries/models/projeto_subsidio_sppo/schema.yml @@ -33,6 +33,8 @@ models: (ida), V (volta), C (circular)" tests: - not_null + - name: partidas + description: "{{ doc('partidas') }}" - name: inicio_periodo description: "Início do período de operação planejado" tests: @@ -41,6 +43,10 @@ models: description: "Fim do período de operação planejado" tests: - not_null + - name: faixa_horaria_inicio + description: "{{ doc('faixa_horaria_inicio') }}" + - name: faixa_horaria_fim + description: "{{ doc('faixa_horaria_fim') }}" - name: intervalo description: "Intervalo de saída planejado" tests: diff --git a/queries/models/projeto_subsidio_sppo/subsidio_data_versao_efetiva.sql b/queries/models/projeto_subsidio_sppo/subsidio_data_versao_efetiva.sql index 18b6a9001..84d31abf5 100644 --- a/queries/models/projeto_subsidio_sppo/subsidio_data_versao_efetiva.sql +++ b/queries/models/projeto_subsidio_sppo/subsidio_data_versao_efetiva.sql @@ -364,11 +364,12 @@ WITH dates AS d LEFT JOIN {{ ref('feed_info_gtfs') }} AS i + -- rj-smtr.gtfs.feed_info AS i USING (feed_version) WHERE {% if is_incremental() %} - data BETWEEN DATE_SUB("{{ var('run_date') }}", INTERVAL 1 DAY) AND DATE("{{ var('run_date') }}") + data = DATE_SUB(DATE("{{ var("run_date") }}"), INTERVAL 1 DAY) {% else %} data <= DATE("{{ var('run_date') }}") {% endif %} @@ -388,6 +389,7 @@ FROM data_versao_efetiva_manual AS d LEFT JOIN {{ ref('feed_info_gtfs') }} AS i + -- rj-smtr.gtfs.feed_info AS i ON (data BETWEEN i.feed_start_date AND i.feed_end_date OR (data >= i.feed_start_date AND i.feed_end_date IS NULL)) diff --git a/queries/models/projeto_subsidio_sppo/viagem_planejada.sql b/queries/models/projeto_subsidio_sppo/viagem_planejada.sql index 6a00ee411..db08c4bb5 100644 --- a/queries/models/projeto_subsidio_sppo/viagem_planejada.sql +++ b/queries/models/projeto_subsidio_sppo/viagem_planejada.sql @@ -198,6 +198,13 @@ and p.data = s.data {% else %} +{% if execute %} + {% set result = run_query("SELECT tipo_os, feed_version, feed_start_date, tipo_dia FROM " ~ ref('subsidio_data_versao_efetiva') ~ " WHERE data BETWEEN DATE_SUB(DATE('" ~ var("run_date") ~ "'), INTERVAL 2 DAY) AND DATE_SUB(DATE('" ~ var("run_date") ~ "'), INTERVAL 1 DAY) ORDER BY feed_version") %} + {% set tipo_oss = result.columns[0].values() %} + {% set feed_versions = result.columns[1].values() %} + {% set feed_start_dates = result.columns[2].values() %} + {% set tipo_dias = result.columns[3].values() %} +{% endif %} WITH -- 1. Define datas do período planejado @@ -211,19 +218,75 @@ WITH tipo_os, FROM {{ ref("subsidio_data_versao_efetiva") }} - -- rj-smtr-dev.projeto_subsidio_sppo.subsidio_data_versao_efetiva + -- rj-smtr.projeto_subsidio_sppo.subsidio_data_versao_efetiva WHERE - data BETWEEN DATE_SUB("{{ var('run_date') }}", INTERVAL 1 DAY) AND DATE("{{ var('run_date') }}")) -SELECT + data BETWEEN DATE_SUB("{{ var('run_date') }}", INTERVAL 2 DAY) AND DATE_SUB("{{ var('run_date') }}", INTERVAL 1 DAY) + ), + -- 2. Busca partidas e quilometragem da faixa horaria (dia anterior) + dia_anterior AS ( + SELECT + COALESCE(d1.feed_version, feed_version) AS feed_version, + COALESCE(d1.feed_start_date, feed_start_date) AS feed_start_date, + feed_end_date, + tipo_os, + COALESCE(d1.tipo_dia, tipo_dia) AS tipo_dia, + servico, + vista, + consorcio, + sentido, + partidas_total_planejada, + distancia_planejada, + distancia_total_planejada, + inicio_periodo, + fim_periodo, + "00:00:00" AS faixa_horaria_inicio, + "02:59:59" AS faixa_horaria_fim, + trip_id_planejado, + trip_id, + shape_id, + shape_id_planejado, + shape, + sentido_shape, + start_pt, + end_pt, + id_tipo_trajeto, + FROM + {{ ref("ordem_servico_trips_shapes_gtfs") }} + -- rj-smtr.gtfs.ordem_servico_trips_shapes + LEFT JOIN + (SELECT * FROM data_versao_efetiva WHERE data = DATE_SUB("{{ var('run_date') }}", INTERVAL 1 DAY)) as d1 + USING (feed_start_date, feed_version, tipo_dia, tipo_os) + WHERE + faixa_horaria_inicio = "24:00:00" + AND tipo_os = "{{ tipo_oss[0] }}" + AND feed_version = "{{ feed_versions[0] }}" + AND feed_start_date = DATE("{{ feed_start_dates[0] }}") + AND tipo_dia = "{{ tipo_dias[0] }}" + ), +combina_trips_shapes AS ( + SELECT * + FROM {{ ref("ordem_servico_trips_shapes_gtfs") }} + -- rj-smtr.gtfs.ordem_servico_trips_shapes + WHERE + tipo_os = "{{ tipo_oss[1] }}" + AND feed_version = "{{ feed_versions[1] }}" + AND feed_start_date = DATE("{{ feed_start_dates[1] }}") + AND tipo_dia = "{{ tipo_dias[1] }}" + UNION ALL + SELECT * + FROM dia_anterior +), +data_trips_shapes AS (SELECT d.data, CASE - WHEN subtipo_dia IS NOT NULL THEN CONCAT(tipo_dia, " - ", subtipo_dia) - ELSE tipo_dia + WHEN subtipo_dia IS NOT NULL THEN CONCAT(o.tipo_dia, " - ", subtipo_dia) + ELSE o.tipo_dia END AS tipo_dia, servico, vista, consorcio, sentido, + partidas_total_planejada, distancia_planejada, distancia_total_planejada, IF(inicio_periodo IS NOT NULL AND ARRAY_LENGTH(SPLIT(inicio_periodo, ":")) = 3, @@ -262,6 +325,70 @@ SELECT ), NULL ) AS fim_periodo, + IF(d.data >= DATE("{{ var("DATA_SUBSIDIO_V9_INICIO") }}"), + DATETIME_ADD( + DATETIME( + d.data, + PARSE_TIME("%T", + CONCAT( + SAFE_CAST(MOD(SAFE_CAST(SPLIT(o.faixa_horaria_inicio, ":")[OFFSET(0)] AS INT64), 24) AS INT64), + ":", + SAFE_CAST(SPLIT(o.faixa_horaria_inicio, ":")[OFFSET(1)] AS INT64), + ":", + SAFE_CAST(SPLIT(o.faixa_horaria_inicio, ":")[OFFSET(2)] AS INT64) + ) + ) + ), + INTERVAL DIV(SAFE_CAST(SPLIT(o.faixa_horaria_inicio, ":")[OFFSET(0)] AS INT64), 24) DAY + ), + DATETIME_ADD( + DATETIME( + d.data, + PARSE_TIME("%T", + CONCAT( + SAFE_CAST(MOD(SAFE_CAST(SPLIT("00:00:00", ":")[OFFSET(0)] AS INT64), 24) AS INT64), + ":", + SAFE_CAST(SPLIT("00:00:00", ":")[OFFSET(1)] AS INT64), + ":", + SAFE_CAST(SPLIT("00:00:00", ":")[OFFSET(2)] AS INT64) + ) + ) + ), + INTERVAL DIV(SAFE_CAST(SPLIT("00:00:00", ":")[OFFSET(0)] AS INT64), 24) DAY + ) + ) AS faixa_horaria_inicio, + IF(d.data >= DATE("{{ var("DATA_SUBSIDIO_V9_INICIO") }}"), + DATETIME_ADD( + DATETIME( + d.data, + PARSE_TIME("%T", + CONCAT( + SAFE_CAST(MOD(SAFE_CAST(SPLIT(o.faixa_horaria_fim, ":")[OFFSET(0)] AS INT64), 24) AS INT64), + ":", + SAFE_CAST(SPLIT(o.faixa_horaria_fim, ":")[OFFSET(1)] AS INT64), + ":", + SAFE_CAST(SPLIT(o.faixa_horaria_fim, ":")[OFFSET(2)] AS INT64) + ) + ) + ), + INTERVAL DIV(SAFE_CAST(SPLIT(o.faixa_horaria_fim, ":")[OFFSET(0)] AS INT64), 24) DAY + ), + DATETIME_ADD( + DATETIME( + d.data, + PARSE_TIME("%T", + CONCAT( + SAFE_CAST(MOD(SAFE_CAST(SPLIT("23:59:59", ":")[OFFSET(0)] AS INT64), 24) AS INT64), + ":", + SAFE_CAST(SPLIT("23:59:59", ":")[OFFSET(1)] AS INT64), + ":", + SAFE_CAST(SPLIT("23:59:59", ":")[OFFSET(2)] AS INT64) + ) + ) + ), + INTERVAL DIV(SAFE_CAST(SPLIT("23:59:59", ":")[OFFSET(0)] AS INT64), 24) DAY + ) + ) AS faixa_horaria_fim, trip_id_planejado, trip_id, shape_id, @@ -273,19 +400,90 @@ SELECT end_pt, id_tipo_trajeto, feed_version, - CURRENT_DATETIME("America/Sao_Paulo") AS datetime_ultima_atualizacao + feed_start_date FROM data_versao_efetiva AS d LEFT JOIN - {{ ref("ordem_servico_trips_shapes_gtfs") }} AS o + combina_trips_shapes AS o +USING (feed_start_date, feed_version, tipo_dia, tipo_os) +WHERE + data = DATE_SUB("{{ var('run_date') }}", INTERVAL 1 DAY) + AND faixa_horaria_inicio != "24:00:00" +), +shapes AS ( + SELECT + * + FROM + {{ ref("shapes_geom_gtfs") }} + -- rj-smtr.gtfs.shapes_geom + WHERE + feed_start_date IN (SELECT feed_start_date FROM data_versao_efetiva WHERE data BETWEEN DATE_SUB("{{ var('run_date') }}", INTERVAL 2 DAY) AND DATE_SUB("{{ var('run_date') }}", INTERVAL 1 DAY)) +), +dados_agregados AS ( +SELECT + data, + tipo_dia, + servico, + vista, + consorcio, + sentido, + SUM(COALESCE(partidas_total_planejada, 0)) AS partidas_total_planejada, + distancia_planejada, + SUM(distancia_total_planejada) AS distancia_total_planejada, + inicio_periodo, + fim_periodo, + faixa_horaria_inicio, + faixa_horaria_fim, + trip_id_planejado, + trip_id, + shape_id, + shape_id_planejado, + data_shape, + sentido_shape, + id_tipo_trajeto, + feed_version, + feed_start_date +FROM + data_trips_shapes +GROUP BY + data, tipo_dia, servico, vista, consorcio, sentido, distancia_planejada, inicio_periodo, fim_periodo, faixa_horaria_inicio, faixa_horaria_fim, trip_id_planejado, trip_id, shape_id, shape_id_planejado, data_shape, sentido_shape, id_tipo_trajeto, feed_version, feed_start_date +) +SELECT + data, + tipo_dia, + servico, + vista, + consorcio, + sentido, + partidas_total_planejada, + distancia_planejada, + distancia_total_planejada, + inicio_periodo, + fim_periodo, + faixa_horaria_inicio, + faixa_horaria_fim, + trip_id_planejado, + trip_id, + shape_id, + shape_id_planejado, + data_shape, + s.shape, + sentido_shape, + s.start_pt, + s.end_pt, + id_tipo_trajeto, + feed_version, + feed_start_date, + CURRENT_DATETIME("America/Sao_Paulo") AS datetime_ultima_atualizacao +FROM + dados_agregados +LEFT JOIN + shapes AS s USING - (feed_start_date, - feed_version, - tipo_dia, - tipo_os) - {% if var("run_date") == "2024-05-05" %} - -- Apuração "Madonna · The Celebration Tour in Rio" - WHERE - servico != "SE001" + (feed_version, feed_start_date, shape_id) +{% if var("run_date") == "2024-05-05" %} + -- Apuração "Madonna · The Celebration Tour in Rio" +WHERE + AND servico != "SE001" {% endif %} {% endif %} \ No newline at end of file diff --git a/queries/models/projeto_subsidio_sppo_encontro_contas/CHANGELOG.md b/queries/models/projeto_subsidio_sppo_encontro_contas/CHANGELOG.md index 718c49111..0d99c1a5b 100644 --- a/queries/models/projeto_subsidio_sppo_encontro_contas/CHANGELOG.md +++ b/queries/models/projeto_subsidio_sppo_encontro_contas/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog - projeto_subsidio_sppo_encontro_contas +## [1.0.4] - 2024-08-29 + +### Alterado + +- Alterado os modelos `balanco_servico_dia` e `balanco_servico_dia_pos_gt` em razão de alterações no modelo `viagens_remuneradas` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/114) + ## [1.0.3] - 2024-06-07 ### Adicionado diff --git a/queries/models/projeto_subsidio_sppo_encontro_contas/balanco_servico_dia.sql b/queries/models/projeto_subsidio_sppo_encontro_contas/balanco_servico_dia.sql index 20b4d20a0..9e5f96889 100644 --- a/queries/models/projeto_subsidio_sppo_encontro_contas/balanco_servico_dia.sql +++ b/queries/models/projeto_subsidio_sppo_encontro_contas/balanco_servico_dia.sql @@ -65,7 +65,7 @@ sumario_dia AS ( -- Km apurada por servico e dia WHERE DATA BETWEEN "2023-09-16" AND "2023-12-31" - AND indicador_viagem_remunerada = TRUE -- useless + AND indicador_viagem_dentro_limite = TRUE -- useless GROUP BY 1, 2 ), diff --git a/queries/models/projeto_subsidio_sppo_encontro_contas/balanco_servico_dia_pos_gt.sql b/queries/models/projeto_subsidio_sppo_encontro_contas/balanco_servico_dia_pos_gt.sql index 5029c6382..75f597255 100644 --- a/queries/models/projeto_subsidio_sppo_encontro_contas/balanco_servico_dia_pos_gt.sql +++ b/queries/models/projeto_subsidio_sppo_encontro_contas/balanco_servico_dia_pos_gt.sql @@ -65,7 +65,7 @@ sumario_dia AS ( -- Km apurada por servico e dia WHERE data BETWEEN "2023-09-16" AND "2023-12-31" - AND indicador_viagem_remunerada = TRUE -- useless + AND indicador_viagem_dentro_limite = TRUE -- useless GROUP BY 1, 2 ), diff --git a/queries/models/veiculo/sppo_veiculo_dia.sql b/queries/models/veiculo/sppo_veiculo_dia.sql index d05e761e8..779df690a 100644 --- a/queries/models/veiculo/sppo_veiculo_dia.sql +++ b/queries/models/veiculo/sppo_veiculo_dia.sql @@ -37,7 +37,8 @@ WITH DISTINCT data, id_veiculo FROM - {{ ref("gps_sppo") }} -- `rj-smtr.br_rj_riodejaneiro_veiculos.gps_sppo` + {{ ref("gps_sppo") }} + -- rj-smtr.br_rj_riodejaneiro_veiculos.gps_sppo WHERE data = DATE("{{ var('run_date') }}") ), autuacoes AS ( @@ -61,6 +62,7 @@ WITH TRUE AS indicador_registro_agente_verao_ar_condicionado FROM {{ ref("sppo_registro_agente_verao") }} + -- rj-smtr.veiculo.sppo_registro_agente_verao WHERE data = DATE("{{ var('run_date') }}") ), autuacao_ar_condicionado AS ( diff --git a/queries/selectors.yml b/queries/selectors.yml new file mode 100644 index 000000000..34c75b302 --- /dev/null +++ b/queries/selectors.yml @@ -0,0 +1,26 @@ +selectors: + - name: apuracao_subsidio_v8 + description: Apuração sem faixa horária + definition: + union: + - method: path + value: models/dashboard_subsidio_sppo_staging + - method: fqn + value: viagem_transacao + - method: path + value: models/dashboard_subsidio_sppo + + - name: apuracao_subsidio_v9 + description: Apuração com faixa horária + definition: + union: + - method: path + value: models/dashboard_subsidio_sppo_staging + - method: fqn + value: viagem_transacao + - method: fqn + value: viagens_remuneradas + - method: path + value: models/financeiro + - method: path + value: models/dashboard_subsidio_sppo_v2 \ No newline at end of file