diff --git a/pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py index 41e9d9cb..a43f302b 100644 --- a/pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py @@ -2,7 +2,7 @@ """ Flows for br_rj_riodejaneiro_bilhetagem -DBT: 2024-12-12 +DBT: 2025-01-06 """ from copy import deepcopy diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md b/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md index 37b2f288..fc4bd135 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md @@ -1,5 +1,20 @@ # Changelog - gtfs +## [1.2.0] - 2025-01-03 + +### Adicionado +- Adicionado schedule de 5 minutos do flow de captura do gtfs (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/379) + +- Adicionado parâmetros personalizados de execução no arquivo `flows.py` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/379) + +### Removido +- Removido o teste de quantidade de abas na planilha da Ordem de Serviço (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/379) + +## [1.1.9] - 2025-01-02 + +### Alterado +- Remove teste de verificação de quilometragem da task `processa_ordem_servico` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/377) + ## [1.1.8] - 2024-12-30 ### Alterado diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py b/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py index 68c53aa6..43a11702 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py @@ -45,8 +45,7 @@ upload_raw_data_to_gcs, upload_staging_data_to_gcs, ) - -# from pipelines.schedules import every_5_minutes +from pipelines.schedules import every_5_minutes from pipelines.tasks import ( check_fail, get_scheduled_timestamp, @@ -287,13 +286,17 @@ gtfs_captura_nova.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value, labels=[constants.RJ_SMTR_AGENT_LABEL.value], + cpu_limit="1000m", + memory_limit="4600Mi", + cpu_request="500m", + memory_request="1000Mi", ) gtfs_captura_nova.state_handlers = [ handler_inject_bd_credentials, handler_initialize_sentry, handler_skip_if_running, ] -# gtfs_captura_nova.schedule = every_5_minutes +gtfs_captura_nova.schedule = every_5_minutes # with Flow( diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py b/pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py index eef9515d..0f106d98 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py @@ -204,8 +204,13 @@ def processa_ordem_servico( None """ - if len(sheetnames) != 3 and regular_sheet_index is None: - raise Exception("More than 3 tabs in the file. Please specify the regular sheet index.") + if ( + len([sheet for sheet in sheetnames if "ANEXO I:" in sheet]) != 1 + and regular_sheet_index is None + ): + raise Exception( + "More than 1 regular sheet in the file. Please specify the regular sheet index." + ) if regular_sheet_index is None: regular_sheet_index = next( @@ -307,19 +312,19 @@ def processa_ordem_servico( if not all_columns_present or not no_duplicate_columns: raise Exception("Missing or duplicated columns in ordem_servico") - quadro_test = quadro_geral.copy() - quadro_test["km_test"] = round( - (quadro_geral["partidas_volta_du"] * quadro_geral["extensao_volta"]) - + (quadro_geral["partidas_ida_du"] * quadro_geral["extensao_ida"]), - 2, - ) - quadro_test["dif"] = quadro_test["km_test"] - quadro_test["km_dia_util"] - - if not ( - round(abs(quadro_test["dif"].max()), 2) <= 0.01 - and round(abs(quadro_test["dif"].min()), 2) <= 0.01 - ): - raise Exception("failed to validate km_test and km_dia_util") + # quadro_test = quadro_geral.copy() + # quadro_test["km_test"] = round( + # (quadro_geral["partidas_volta_du"] * quadro_geral["extensao_volta"]) + # + (quadro_geral["partidas_ida_du"] * quadro_geral["extensao_ida"]), + # 2, + # ) + # quadro_test["dif"] = quadro_test["km_test"] - quadro_test["km_dia_util"] + + # if not ( + # round(abs(quadro_test["dif"].max()), 2) <= 0.01 + # and round(abs(quadro_test["dif"].min()), 2) <= 0.01 + # ): + # raise Exception("failed to validate km_test and km_dia_util") local_file_path = list(filter(lambda x: "ordem_servico" in x, local_filepath))[0] quadro_geral_csv = quadro_geral.to_csv(index=False) diff --git a/pipelines/migration/projeto_subsidio_sppo/CHANGELOG.md b/pipelines/migration/projeto_subsidio_sppo/CHANGELOG.md index b44ffb47..df460614 100644 --- a/pipelines/migration/projeto_subsidio_sppo/CHANGELOG.md +++ b/pipelines/migration/projeto_subsidio_sppo/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog - projeto_subsidio_sppo +## [1.0.7] - 2025-01-03 + +### Adicionado + +- Adiciona a materialização dos modelos do dataset `monitoramento` ao flow do subsídio (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/372) + ## [1.0.6] - 2024-12-17 ### Adicionado diff --git a/pipelines/migration/projeto_subsidio_sppo/flows.py b/pipelines/migration/projeto_subsidio_sppo/flows.py index 28edb52a..7fbb820e 100644 --- a/pipelines/migration/projeto_subsidio_sppo/flows.py +++ b/pipelines/migration/projeto_subsidio_sppo/flows.py @@ -3,7 +3,7 @@ """ Flows for projeto_subsidio_sppo -DBT: 2024-12-20 +DBT: 2025-01-06 """ from prefect import Parameter, case, task @@ -275,6 +275,18 @@ _vars=dbt_vars_2, upstream_tasks=[dbt_vars_2], ) + dbt_vars_3 = get_join_dict( + dict_list=[dbt_vars_1], + new_dict=date_intervals["second_range"] + | {"tipo_materializacao": "monitoramento"}, + upstream_tasks=[SUBSIDIO_SPPO_APURACAO_RUN_2], + )[0] + + SUBSIDIO_SPPO_APURACAO_RUN_3 = run_dbt_selector( + selector_name="monitoramento_subsidio", + _vars=dbt_vars_3, + upstream_tasks=[dbt_vars_3], + ) # POST-DATA QUALITY CHECK # SUBSIDIO_SPPO_DATA_QUALITY_POS_2 = run_dbt_tests( @@ -323,6 +335,11 @@ _vars=_vars, ) + SUBSIDIO_SPPO_APURACAO_RUN_2 = run_dbt_selector( + selector_name="monitoramento_subsidio", + _vars=_vars | {"tipo_materializacao": "monitoramento"}, + upstream_tasks=[SUBSIDIO_SPPO_APURACAO_RUN], + ) # POST-DATA QUALITY CHECK # SUBSIDIO_SPPO_DATA_QUALITY_POS = run_dbt_tests( dataset_id="viagens_remuneradas sumario_servico_dia_pagamento", @@ -373,6 +390,7 @@ dataset_id="sppo_registros sppo_realocacao check_gps_treatment__gps_sppo sppo_veiculo_dia", # noqa _vars=dbt_vars, ) + DATA_QUALITY_PRE = dbt_data_quality_checks( dbt_logs=SUBSIDIO_SPPO_DATA_QUALITY_PRE, checks_list={}, diff --git a/queries/dbt_project.yml b/queries/dbt_project.yml index 6d03be30..ed8796ff 100644 --- a/queries/dbt_project.yml +++ b/queries/dbt_project.yml @@ -107,6 +107,7 @@ vars: subsidio_parametros: "`rj-smtr-staging.dashboard_subsidio_sppo_staging.subsidio_parametros`" shapes_version: "YYYY-MM-DD" frequencies_version: "YYYY-MM-DD" + tipo_materializacao: "subsidio" # Feature penalidade de autuação por inoperância do ar condicionado (DECRETO RIO 51940/2023) DATA_SUBSIDIO_V2_INICIO: "2023-01-16" # Feature penalidade de autuação por segurança e limpeza/equipamento (DECRETO RIO 52820/2023) diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md b/queries/models/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md index 5e365f46..191c24ad 100644 --- a/queries/models/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md +++ b/queries/models/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog - bilhetagem +## [2.0.3] - 2025-01-02 + +### Corrigido + +- Altera incremental strategy do modelo `integracao.sql` para `insert_overwrite` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/375) + ## [2.0.2] - 2024-12-30 ### Removido diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem/integracao.sql b/queries/models/br_rj_riodejaneiro_bilhetagem/integracao.sql index fd41b477..0e8feebc 100644 --- a/queries/models/br_rj_riodejaneiro_bilhetagem/integracao.sql +++ b/queries/models/br_rj_riodejaneiro_bilhetagem/integracao.sql @@ -2,11 +2,57 @@ {{ config( materialized="incremental", + incremental_strategy="insert_overwrite", partition_by={"field": "data", "data_type": "date", "granularity": "day"}, - unique_key="id_transacao", ) }} +{% set incremental_filter %} + date(data) between date("{{var('date_range_start')}}") and date( + "{{var('date_range_end')}}" + ) + and timestamp_captura + between datetime("{{var('date_range_start')}}") and datetime( + "{{var('date_range_end')}}" + ) +{% endset %} + +{% set integracao_staging = ref("staging_integracao_transacao") %} + +{% if execute %} + {% if is_incremental() %} + {% set integracao_partitions_query %} + WITH integracao AS ( + SELECT DISTINCT + CONCAT("'", DATE(data_transacao), "'") AS data_transacao + FROM + {{ integracao_staging }}, + UNNEST([ + data_transacao_t0, + data_transacao_t1, + data_transacao_t2, + data_transacao_t3, + data_transacao_t4 + ]) AS data_transacao + WHERE + {{ incremental_filter }} + ) + SELECT + * + FROM + integracao + WHERE + data_transacao IS NOT NULL + + {% endset %} + + {% set integracao_partitions = ( + run_query(integracao_partitions_query).columns[0].values() + ) %} + + {% endif %} +{% endif %} + with integracao_transacao_deduplicada as ( select * except (rn) @@ -17,19 +63,9 @@ with row_number() over ( partition by id order by timestamp_captura desc ) as rn - from - {{ ref("staging_integracao_transacao") }} - {# `rj-smtr.br_rj_riodejaneiro_bilhetagem_staging.integracao_transacao` #} - {% if is_incremental() -%} - where - date(data) between date("{{var('date_range_start')}}") and date( - "{{var('date_range_end')}}" - ) - and timestamp_captura - between datetime("{{var('date_range_start')}}") and datetime( - "{{var('date_range_end')}}" - ) - {%- endif %} + from {{ integracao_staging }} + {# `rj-smtr.br_rj_riodejaneiro_bilhetagem_staging.integracao_transacao` #} + {% if is_incremental() -%} where {{ incremental_filter }} {%- endif %} ) where rn = 1 ), @@ -77,7 +113,7 @@ with ] ) as im ), - integracao_rn as ( + integracao_new as ( select i.data, i.hora, @@ -109,39 +145,53 @@ with i.valor_transacao, i.valor_transacao_total, i.texto_adicional, - '{{ var("version") }}' as versao, - row_number() over ( - partition by id_integracao, id_transacao - order by datetime_processamento_integracao desc - ) as rn + '{{ var("version") }}' as versao from integracao_melt i left join {{ source("cadastro", "modos") }} m on i.id_tipo_modal = m.id_modo and m.fonte = "jae" - left join - {{ ref("operadoras") }} do on i.id_operadora = do.id_operadora_jae - {# `rj-smtr.cadastro.operadoras` do on i.id_operadora = do.id_operadora_jae #} - left join - {{ ref("consorcios") }} dc on i.id_consorcio = dc.id_consorcio_jae - {# `rj-smtr.cadastro.consorcios` dc on i.id_consorcio = dc.id_consorcio_jae #} + left join {{ ref("operadoras") }} do on i.id_operadora = do.id_operadora_jae + {# `rj-smtr.cadastro.operadoras` do on i.id_operadora = do.id_operadora_jae #} + left join {{ ref("consorcios") }} dc on i.id_consorcio = dc.id_consorcio_jae + {# `rj-smtr.cadastro.consorcios` dc on i.id_consorcio = dc.id_consorcio_jae #} left join {{ ref("staging_linha") }} l {# `rj-smtr.br_rj_riodejaneiro_bilhetagem_staging.linha` l #} on i.id_linha = l.cd_linha where i.id_transacao is not null ), + complete_partitions as ( + select *, 0 as priority + from integracao_new + + {% if is_incremental() %} + union all + + select *, 1 as priority + from {{ this }} + where + {% if integracao_partitions | length > 0 %} + data in ({{ integracao_partitions | join(", ") }}) + {% else %} data = "2000-01-01" + {% endif %} + {% endif %} + ), integracoes_teste_invalidas as ( select distinct i.id_integracao - from integracao_rn i + from complete_partitions i left join {{ ref("staging_linha_sem_ressarcimento") }} l {# `rj-smtr.br_rj_riodejaneiro_bilhetagem_staging.linha_sem_ressarcimento` l #} on i.id_servico_jae = l.id_linha where l.id_linha is not null or i.data < "2023-07-17" ) -select * except (rn) -from integracao_rn -where - rn = 1 - and id_integracao not in (select id_integracao from integracoes_teste_invalidas) +select * except (priority) +from complete_partitions +where id_integracao not in (select id_integracao from integracoes_teste_invalidas) +qualify + row_number() over ( + partition by id_integracao, id_transacao, priority + order by datetime_processamento_integracao desc + ) + = 1 \ No newline at end of file diff --git a/queries/models/monitoramento/CHANGELOG.md b/queries/models/monitoramento/CHANGELOG.md index 8df2f9f0..259f4f63 100644 --- a/queries/models/monitoramento/CHANGELOG.md +++ b/queries/models/monitoramento/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog - monitoramento +## [1.2.1] - 2025-01-03 + +### Adicionado +- Cria modelo `monitoramento_viagem_transacao.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/372) + ## [1.2.0] - 2024-11-28 ### Adicionado diff --git a/queries/models/monitoramento/monitoramento_viagem_transacao.sql b/queries/models/monitoramento/monitoramento_viagem_transacao.sql new file mode 100644 index 00000000..61a41003 --- /dev/null +++ b/queries/models/monitoramento/monitoramento_viagem_transacao.sql @@ -0,0 +1,26 @@ +{{ + config( + materialized="incremental", + partition_by={"field": "data", "data_type": "date", "granularity": "day"}, + incremental_strategy="insert_overwrite", + ) +}} + +select + data, + id_viagem, + id_veiculo, + id_validador, + servico, + tipo_viagem, + sentido, + distancia_planejada, + quantidade_transacao, + quantidade_transacao_riocard, + percentual_estado_equipamento_aberto, + indicador_estado_equipamento_aberto, + datetime_partida_bilhetagem, + datetime_partida, + datetime_chegada, + datetime_ultima_atualizacao +from {{ ref('viagem_transacao_aux') }} diff --git a/queries/models/planejamento/schema.yml b/queries/models/planejamento/schema.yml index bcc48243..d8a4d572 100644 --- a/queries/models/planejamento/schema.yml +++ b/queries/models/planejamento/schema.yml @@ -25,22 +25,22 @@ models: description: "{{ doc('faixa_horaria_fim') }}" - name: partidas description: "{{ doc('partidas') }}" - tests: - - dbt_expectations.expect_column_sum_to_be_between: - name: dbt_expectations.expect_column_sum_to_be_between__partidas__ordem_servico_faixa_horaria - min_value: 0 - group_by: [ feed_start_date, tipo_os, tipo_dia, servico ] - strictly: true - where: "feed_start_date = '{{ var('data_versao_gtfs') }}'" + # tests: + # - dbt_expectations.expect_column_sum_to_be_between: + # name: dbt_expectations.expect_column_sum_to_be_between__partidas__ordem_servico_faixa_horaria + # min_value: 0 + # group_by: [ feed_start_date, tipo_os, tipo_dia, servico ] + # strictly: true + # where: "feed_start_date = '{{ var('data_versao_gtfs') }}'" - name: quilometragem description: "{{ doc('quilometragem') }}" - tests: - - dbt_expectations.expect_column_sum_to_be_between: - name: dbt_expectations.expect_column_sum_to_be_between__quilometragem__ordem_servico_faixa_horaria - min_value: 0 - group_by: [ feed_start_date, tipo_os, tipo_dia, servico ] - strictly: true - where: "feed_start_date = '{{ var('data_versao_gtfs') }}'" + # tests: + # - dbt_expectations.expect_column_sum_to_be_between: + # name: dbt_expectations.expect_column_sum_to_be_between__quilometragem__ordem_servico_faixa_horaria + # min_value: 0 + # group_by: [ feed_start_date, tipo_os, tipo_dia, servico ] + # strictly: true + # where: "feed_start_date = '{{ var('data_versao_gtfs') }}'" - name: segmento_shape description: Tabela contendo os shapes segmentados usados na validação de viagens. columns: diff --git a/queries/models/projeto_subsidio_sppo/CHANGELOG.md b/queries/models/projeto_subsidio_sppo/CHANGELOG.md index 677ca808..ae9763f3 100644 --- a/queries/models/projeto_subsidio_sppo/CHANGELOG.md +++ b/queries/models/projeto_subsidio_sppo/CHANGELOG.md @@ -1,5 +1,17 @@ # Changelog - projeto_subsidio_sppo +## [9.1.4] - 2025-01-06 + +### Adicionado + +- Adicionadas datas com os tipo os `Fim de ano`, `Reveillon` e tipo dia `Ponto facultativo` no modelo `subsidio_data_versao_efetiva.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/383) + +## [9.1.3] - 2025-01-03 + +### Alterado + +- Alterado a data final no modelo `subsidio_data_versao_efetiva.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/378) + ## [9.1.2] - 2024-12-18 ### Alterado diff --git a/queries/models/projeto_subsidio_sppo/subsidio_data_versao_efetiva.sql b/queries/models/projeto_subsidio_sppo/subsidio_data_versao_efetiva.sql index 17b75813..d6810c96 100644 --- a/queries/models/projeto_subsidio_sppo/subsidio_data_versao_efetiva.sql +++ b/queries/models/projeto_subsidio_sppo/subsidio_data_versao_efetiva.sql @@ -325,6 +325,7 @@ WITH WHEN data = "2024-10-21" THEN "Ponto Facultativo" -- Ponto Facultativo - Dia do Comérciario - (Processo.Rio MTR-DES-2024/64171) WHEN data = "2024-10-28" THEN "Ponto Facultativo" -- Ponto Facultativo - Dia do Servidor Público - (Processo.Rio MTR-DES-2024/64417) WHEN data BETWEEN DATE(2024,11,18) AND DATE(2024,11,19) THEN "Ponto Facultativo" -- Ponto Facultativo - G20 - (Processo.Rio MTR-DES-2024/67477) + WHEN data = DATE(2024,12,24) THEN "Ponto Facultativo" -- Ponto Facultativo - Véspera de Natal - (Processo.Rio MTR-DES-2024/75723) WHEN EXTRACT(DAY FROM data) = 20 AND EXTRACT(MONTH FROM data) = 1 THEN "Domingo" -- Dia de São Sebastião -- Art. 8°, I - Lei Municipal nº 5146/2010 WHEN EXTRACT(DAY FROM data) = 23 AND EXTRACT(MONTH FROM data) = 4 THEN "Domingo" -- Dia de São Jorge -- Art. 8°, II - Lei Municipal nº 5146/2010 / Lei Estadual Nº 5198/2008 / Lei Estadual Nº 5645/2010 WHEN EXTRACT(DAY FROM data) = 20 AND EXTRACT(MONTH FROM data) = 11 THEN "Domingo" -- Aniversário de morte de Zumbi dos Palmares / Dia da Consciência Negra -- Art. 8°, IV - Lei Municipal nº 5146/2010 / Lei Estadual nº 526/1982 / Lei Estadual nº 1929/1991 / Lei Estadual nº 4007/2002 / Lei Estadual Nº 5645/2010 @@ -359,9 +360,15 @@ WITH WHEN data = DATE(2024,11,24) THEN "Parada LGBTQI+" -- Processo.Rio MTR-DES-2024/70057 WHEN data BETWEEN DATE(2024,12,07) AND DATE(2024,12,08) THEN "Extraordinária - Verão" -- Processo.Rio MTR-DES-2024/72800 WHEN data BETWEEN DATE(2024,12,14) AND DATE(2024,12,15) THEN "Extraordinária - Verão" -- Processo.Rio MTR-DES-2024/74396 + WHEN data = DATE(2024,12,23) THEN "Fim de ano" -- Processo.Rio MTR-DES-2024/75723 + WHEN data BETWEEN DATE(2024,12,26) AND DATE(2024,12,27) THEN "Fim de ano" -- Processo.Rio MTR-DES-2024/75723 + WHEN data = DATE(2024,12,30) THEN "Fim de ano" -- Processo.Rio MTR-DES-2024/75723 + WHEN data = DATE(2024,12,31) THEN "Vespera de Reveillon" -- Processo.Rio MTR-DES-2024/76453 + WHEN data = DATE(2025,01,01) THEN "Reveillon" -- Processo.Rio MTR-DES-2024/76453 + WHEN data BETWEEN DATE(2025,01,02) AND DATE(2025,01,03) THEN "Fim de ano" -- Processo.Rio MTR-DES-2024/77046 ELSE "Regular" END AS tipo_os, - FROM UNNEST(GENERATE_DATE_ARRAY("{{var('DATA_SUBSIDIO_V6_INICIO')}}", "2024-12-31")) AS data), + FROM UNNEST(GENERATE_DATE_ARRAY("{{var('DATA_SUBSIDIO_V6_INICIO')}}", "2025-12-31")) AS data), data_versao_efetiva_manual AS ( SELECT data, @@ -369,6 +376,7 @@ WITH CASE WHEN tipo_os = "Extraordinária - Verão" THEN "Verão" WHEN tipo_os LIKE "%Madonna%" THEN "Madonna" + WHEN tipo_os LIKE "%Reveillon%" THEN "Reveillon" WHEN tipo_os = "Regular" THEN NULL ELSE tipo_os END AS subtipo_dia, diff --git a/queries/models/subsidio/CHANGELOG.md b/queries/models/subsidio/CHANGELOG.md index 6a60dff1..b53946e5 100644 --- a/queries/models/subsidio/CHANGELOG.md +++ b/queries/models/subsidio/CHANGELOG.md @@ -1,5 +1,15 @@ # Changelog - subsidio +## [2.0.1] - 2025-01-06 + +# Corrigido + +- Corrigido e refatorado o modelo `viagem_transacao.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/372) + +# Adicionado + +- Adicionado o modelo `viagem_transacao_aux.sql`(https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/372) + ## [2.0.0] - 2024-12-06 # Corrigido diff --git a/queries/models/subsidio/viagem_transacao.sql b/queries/models/subsidio/viagem_transacao.sql index eba9bd43..609a8b8b 100644 --- a/queries/models/subsidio/viagem_transacao.sql +++ b/queries/models/subsidio/viagem_transacao.sql @@ -6,304 +6,20 @@ ) }} -with - -- 1. Transações Jaé - transacao as ( - select id_veiculo, datetime_transacao - from {{ ref("transacao") }} - -- rj-smtr.br_rj_riodejaneiro_bilhetagem.transacao - where - data between date("{{ var('start_date') }}") and date_add( - date("{{ var('end_date') }}"), interval 1 day - ) - and date(datetime_processamento) - <= date_add(date("{{ var('end_date') }}"), interval 6 day) - ), - -- 2. Transações RioCard - transacao_riocard as ( - select id_veiculo, datetime_transacao - from {{ ref("transacao_riocard") }} - -- rj-smtr.br_rj_riodejaneiro_bilhetagem.transacao_riocard - where - data between date("{{ var('start_date') }}") and date_add( - date("{{ var('end_date') }}"), interval 1 day - ) - and date(datetime_processamento) - <= date_add(date("{{ var('end_date') }}"), interval 6 day) - ), - -- 3. GPS Validador - gps_validador as ( - select - data, - datetime_gps, - id_veiculo, - id_validador, - estado_equipamento, - latitude, - longitude - from {{ ref("gps_validador") }} - -- rj-smtr.br_rj_riodejaneiro_bilhetagem.gps_validador - where - data between date("{{ var('start_date') }}") and date_add( - date("{{ var('end_date') }}"), interval 1 day - ) - and ( - ( - data < date("{{ var('DATA_SUBSIDIO_V12_INICIO') }}") - and (latitude != 0 or longitude != 0) - ) - or data >= date("{{ var('DATA_SUBSIDIO_V12_INICIO') }}") - ) - and date(datetime_captura) - <= date_add(date("{{ var('end_date') }}"), interval 6 day) - ), - -- 4. Viagens realizadas - viagem as ( - select - data, - servico_realizado as servico, - datetime_partida, - datetime_chegada, - id_veiculo, - id_viagem, - distancia_planejada - from {{ ref("viagem_completa") }} - -- rj-smtr.projeto_subsidio_sppo.viagem_completa - where - data - between date_sub(date("{{ var('start_date') }}"), interval 1 day) and date( - "{{ var('end_date') }}" - ) - ), - -- 5. Status dos veículos - veiculos as ( - select data, id_veiculo, status - from {{ ref("sppo_veiculo_dia") }} - -- rj-smtr.veiculo.sppo_veiculo_dia - where - data - between date("{{ var('start_date') }}") and date("{{ var('end_date') }}") - ), - -- 6. Viagem, para fins de contagem de passageiros, com tolerância de 30 minutos, - -- limitada pela viagem anterior - viagem_com_tolerancia_previa as ( - select - v.*, - lag(v.datetime_chegada) over ( - partition by v.id_veiculo order by v.datetime_partida - ) as viagem_anterior_chegada, - case - when - lag(v.datetime_chegada) over ( - partition by v.id_veiculo order by v.datetime_partida - ) - is null - then datetime(timestamp_sub(datetime_partida, interval 30 minute)) - else - datetime( - timestamp_add( - greatest( - timestamp_sub(datetime_partida, interval 30 minute), - lag(v.datetime_chegada) over ( - partition by v.id_veiculo - order by v.datetime_partida - ) - ), - interval 1 second - ) - ) - end as datetime_partida_com_tolerancia - from viagem as v - ), - -- 7. Considera apenas as viagens realizadas no período de apuração - viagem_com_tolerancia as ( - select * - from viagem_com_tolerancia_previa - where - data - between date("{{ var('start_date') }}") and date("{{ var('end_date') }}") - ), - -- 8. Contagem de transações Jaé - transacao_contagem as ( - select v.data, v.id_viagem, count(t.datetime_transacao) as quantidade_transacao - from transacao as t - join - viagem_com_tolerancia as v - on t.id_veiculo = substr(v.id_veiculo, 2) - and t.datetime_transacao - between v.datetime_partida_com_tolerancia and v.datetime_chegada - group by v.data, v.id_viagem - ), - -- 9. Contagem de transações RioCard - transacao_riocard_contagem as ( - select - v.data, - v.id_viagem, - count(tr.datetime_transacao) as quantidade_transacao_riocard - from transacao_riocard as tr - join - viagem_com_tolerancia as v - on tr.id_veiculo = substr(v.id_veiculo, 2) - and tr.datetime_transacao - between v.datetime_partida_com_tolerancia and v.datetime_chegada - group by v.data, v.id_viagem - ), - -- 10. Ajusta estado do equipamento - -- Agrupa mesma posição para mesmo validador e veículo, mantendo preferencialmente - -- o estado do equipamento "ABERTO" quanto latitude e longitude for diferente de - -- (0,0) - estado_equipamento_aux as ( - select * - from - ( - ( - select - data, - id_validador, - id_veiculo, - latitude, - longitude, - if( - count(case when estado_equipamento = "ABERTO" then 1 end) - >= 1, - "ABERTO", - "FECHADO" - ) as estado_equipamento, - min(datetime_gps) as datetime_gps, - from gps_validador - where - ( - data >= date("{{ var('DATA_SUBSIDIO_V12_INICIO') }}") - and latitude != 0 - and longitude != 0 - ) - or data < date("{{ var('DATA_SUBSIDIO_V12_INICIO') }}") - group by 1, 2, 3, 4, 5 - ) - union all - ( - select - data, - id_validador, - id_veiculo, - latitude, - longitude, - estado_equipamento, - datetime_gps, - from gps_validador - where - data >= date("{{ var('DATA_SUBSIDIO_V12_INICIO') }}") - and latitude = 0 - and longitude = 0 - ) - ) - ), - -- 11. Relacionamento entre estado do equipamento e viagem - gps_validador_viagem as ( - select - v.data, - e.datetime_gps, - v.id_viagem, - e.id_validador, - e.estado_equipamento, - e.latitude, - e.longitude - from estado_equipamento_aux as e - join - viagem as v - on e.id_veiculo = substr(v.id_veiculo, 2) - and e.datetime_gps between v.datetime_partida and v.datetime_chegada - ), - -- 12. Calcula a porcentagem de estado do equipamento "ABERTO" por validador e - -- viagem - estado_equipamento_perc as ( - select - data, - id_viagem, - id_validador, - countif(estado_equipamento = "ABERTO") - / count(*) as percentual_estado_equipamento_aberto - from gps_validador_viagem - group by 1, 2, 3 - ), - -- 13. Considera o validador com maior porcentagem de estado do equipamento - -- "ABERTO" por viagem - estado_equipamento_max_perc as ( - select - data, - id_viagem, - max_by(id_validador, percentual_estado_equipamento_aberto) as id_validador, - max( - percentual_estado_equipamento_aberto - ) as percentual_estado_equipamento_aberto - from estado_equipamento_perc - group by 1, 2 - ), - -- 14. Verifica se a viagem possui estado do equipamento "ABERTO" em pelo menos - -- 80% dos registros - estado_equipamento_verificacao as ( - select - data, - id_viagem, - id_validador, - percentual_estado_equipamento_aberto, - if( - percentual_estado_equipamento_aberto >= 0.8 - or percentual_estado_equipamento_aberto is null, - true, - false - ) as indicador_estado_equipamento_aberto - from viagem - left join estado_equipamento_max_perc using (data, id_viagem) - ) select - v.data, - v.id_viagem, - v.id_veiculo, - v.servico, - eev.id_validador, - case - when - v.data >= date("{{ var('DATA_SUBSIDIO_V8_INICIO') }}") - and ( - ( - v.data < date("{{ var('DATA_SUBSIDIO_V12_INICIO') }}") - and ( - coalesce(tr.quantidade_transacao_riocard, 0) = 0 - or coalesce(eev.indicador_estado_equipamento_aberto, false) - = false - ) - ) - or ( - v.data >= date("{{ var('DATA_SUBSIDIO_V12_INICIO') }}") - and ( - ( - coalesce(tr.quantidade_transacao_riocard, 0) = 0 - and coalesce(t.quantidade_transacao, 0) = 0 - ) - or coalesce(eev.indicador_estado_equipamento_aberto, false) - = false - ) - ) - ) - and ve.status - in ("Licenciado com ar e não autuado", "Licenciado sem ar e não autuado") - and v.datetime_partida not between "2024-10-06 06:00:00" - and "2024-10-06 20:00:00" -- Eleição (2024-10-06) - then "Sem transação" - else ve.status - end as tipo_viagem, - v.distancia_planejada, - coalesce(t.quantidade_transacao, 0) as quantidade_transacao, - coalesce(tr.quantidade_transacao_riocard, 0) as quantidade_transacao_riocard, - eev.percentual_estado_equipamento_aberto, - eev.indicador_estado_equipamento_aberto, - v.datetime_partida_com_tolerancia as datetime_partida_bilhetagem, - v.datetime_partida, - v.datetime_chegada, - current_datetime("America/Sao_Paulo") as datetime_ultima_atualizacao -from viagem_com_tolerancia as v -left join veiculos as ve using (data, id_veiculo) -left join transacao_contagem as t using (data, id_viagem) -left join transacao_riocard_contagem as tr using (data, id_viagem) -left join estado_equipamento_verificacao as eev using (data, id_viagem) + data, + id_viagem, + id_veiculo, + servico, + id_validador, + tipo_viagem, + distancia_planejada, + quantidade_transacao, + quantidade_transacao_riocard, + percentual_estado_equipamento_aberto, + indicador_estado_equipamento_aberto, + datetime_partida_bilhetagem, + datetime_partida, + datetime_chegada, + datetime_ultima_atualizacao +from {{ref('viagem_transacao_aux')}} diff --git a/queries/models/subsidio/viagem_transacao_aux.sql b/queries/models/subsidio/viagem_transacao_aux.sql new file mode 100644 index 00000000..ffd1f058 --- /dev/null +++ b/queries/models/subsidio/viagem_transacao_aux.sql @@ -0,0 +1,307 @@ +{{ config(materialized="ephemeral") }} + +{% if var('tipo_materializacao') == 'monitoramento' %} + {% set interval_minutes = 120 %} +{% elif var('tipo_materializacao') == 'subsidio' %} + {% set interval_minutes = 30 %} +{% endif %} +with + -- 1. Transações Jaé + transacao as ( + select id_veiculo, datetime_transacao + from {{ ref("transacao") }} + -- rj-smtr.br_rj_riodejaneiro_bilhetagem.transacao + where + data between date("{{ var('start_date') }}") and date_add( + date("{{ var('end_date') }}"), interval 1 day + ) + and date(datetime_processamento) - date(datetime_transacao) <= interval 6 day + ), + -- 2. Transações RioCard + transacao_riocard as ( + select id_veiculo, datetime_transacao + from {{ ref("transacao_riocard") }} + -- rj-smtr.br_rj_riodejaneiro_bilhetagem.transacao_riocard + where + data between date("{{ var('start_date') }}") and date_add( + date("{{ var('end_date') }}"), interval 1 day + ) + and date(datetime_processamento) - date(datetime_transacao) <= interval 6 day + ), + -- 3. GPS Validador + gps_validador as ( + select + data, + datetime_gps, + id_veiculo, + id_validador, + estado_equipamento, + latitude, + longitude + from {{ ref("gps_validador") }} + -- rj-smtr.br_rj_riodejaneiro_bilhetagem.gps_validador + where + data between date("{{ var('start_date') }}") and date_add( + date("{{ var('end_date') }}"), interval 1 day + ) + and ( + ( + data < date("{{ var('DATA_SUBSIDIO_V12_INICIO') }}") + and (latitude != 0 or longitude != 0) + ) + or data >= date("{{ var('DATA_SUBSIDIO_V12_INICIO') }}") + ) + and date(datetime_captura) - date(datetime_gps) <= interval 6 day + ), + -- 4. Viagens realizadas + viagem as ( + select + data, + servico_realizado as servico, + datetime_partida, + datetime_chegada, + id_veiculo, + id_viagem, + distancia_planejada, + sentido + from {{ ref("viagem_completa") }} + -- rj-smtr.projeto_subsidio_sppo.viagem_completa + where + data + between date_sub(date("{{ var('start_date') }}"), interval 1 day) and date( + "{{ var('end_date') }}" + ) + ), + -- 5. Status dos veículos + veiculos as ( + select data, id_veiculo, status + from {{ ref("sppo_veiculo_dia") }} + -- rj-smtr.veiculo.sppo_veiculo_dia + where + data + between date("{{ var('start_date') }}") and date("{{ var('end_date') }}") + ), + -- 6. Viagem, para fins de contagem de passageiros, com tolerância de 30 minutos, + -- limitada pela viagem anterior + viagem_com_tolerancia_previa as ( + select + v.*, + lag(v.datetime_chegada) over ( + partition by v.id_veiculo order by v.datetime_partida + ) as viagem_anterior_chegada, + case + when + lag(v.datetime_chegada) over ( + partition by v.id_veiculo order by v.datetime_partida + ) + is null + then datetime(timestamp_sub(datetime_partida, interval {{ interval_minutes }} minute)) + else + datetime( + timestamp_add( + greatest( + timestamp_sub(datetime_partida, interval {{ interval_minutes }} minute), + lag(v.datetime_chegada) over ( + partition by v.id_veiculo + order by v.datetime_partida + ) + ), + interval 1 second + ) + ) + end as datetime_partida_com_tolerancia + from viagem as v + ), + -- 7. Considera apenas as viagens realizadas no período de apuração + viagem_com_tolerancia as ( + select * + from viagem_com_tolerancia_previa + where + data + between date("{{ var('start_date') }}") and date("{{ var('end_date') }}") + ), + -- 8. Contagem de transações Jaé + transacao_contagem as ( + select v.data, v.id_viagem, count(t.datetime_transacao) as quantidade_transacao + from transacao as t + join + viagem_com_tolerancia as v + on t.id_veiculo = substr(v.id_veiculo, 2) + and t.datetime_transacao + between v.datetime_partida_com_tolerancia and v.datetime_chegada + group by v.data, v.id_viagem + ), + -- 9. Contagem de transações RioCard + transacao_riocard_contagem as ( + select + v.data, + v.id_viagem, + count(tr.datetime_transacao) as quantidade_transacao_riocard + from transacao_riocard as tr + join + viagem_com_tolerancia as v + on tr.id_veiculo = substr(v.id_veiculo, 2) + and tr.datetime_transacao + between v.datetime_partida_com_tolerancia and v.datetime_chegada + group by v.data, v.id_viagem + ), + -- 10. Ajusta estado do equipamento + -- Agrupa mesma posição para mesmo validador e veículo, mantendo preferencialmente + -- o estado do equipamento "ABERTO" quanto latitude e longitude for diferente de + -- (0,0) + estado_equipamento_aux as ( + select * + from + ( + ( + select + data, + id_validador, + id_veiculo, + latitude, + longitude, + if( + count(case when estado_equipamento = "ABERTO" then 1 end) + >= 1, + "ABERTO", + "FECHADO" + ) as estado_equipamento, + min(datetime_gps) as datetime_gps, + from gps_validador + where + ( + data >= date("{{ var('DATA_SUBSIDIO_V12_INICIO') }}") + and latitude != 0 + and longitude != 0 + ) + or data < date("{{ var('DATA_SUBSIDIO_V12_INICIO') }}") + group by 1, 2, 3, 4, 5 + ) + union all + ( + select + data, + id_validador, + id_veiculo, + latitude, + longitude, + estado_equipamento, + datetime_gps, + from gps_validador + where + data >= date("{{ var('DATA_SUBSIDIO_V12_INICIO') }}") + and latitude = 0 + and longitude = 0 + ) + ) + ), + -- 11. Relacionamento entre estado do equipamento e viagem + gps_validador_viagem as ( + select + v.data, + e.datetime_gps, + v.id_viagem, + e.id_validador, + e.estado_equipamento, + e.latitude, + e.longitude + from estado_equipamento_aux as e + join + viagem as v + on e.id_veiculo = substr(v.id_veiculo, 2) + and e.datetime_gps between v.datetime_partida and v.datetime_chegada + ), + -- 12. Calcula a porcentagem de estado do equipamento "ABERTO" por validador e + -- viagem + estado_equipamento_perc as ( + select + data, + id_viagem, + id_validador, + countif(estado_equipamento = "ABERTO") + / count(*) as percentual_estado_equipamento_aberto + from gps_validador_viagem + group by 1, 2, 3 + ), + -- 13. Considera o validador com maior porcentagem de estado do equipamento + -- "ABERTO" por viagem + estado_equipamento_max_perc as ( + select + data, + id_viagem, + max_by(id_validador, percentual_estado_equipamento_aberto) as id_validador, + max( + percentual_estado_equipamento_aberto + ) as percentual_estado_equipamento_aberto + from estado_equipamento_perc + group by 1, 2 + ), + -- 14. Verifica se a viagem possui estado do equipamento "ABERTO" em pelo menos + -- 80% dos registros + estado_equipamento_verificacao as ( + select + data, + id_viagem, + id_validador, + percentual_estado_equipamento_aberto, + if( + percentual_estado_equipamento_aberto >= 0.8 + or percentual_estado_equipamento_aberto is null, + true, + false + ) as indicador_estado_equipamento_aberto + from viagem + left join estado_equipamento_max_perc using (data, id_viagem) + ) +select + v.data, + v.id_viagem, + v.id_veiculo, + v.servico, + eev.id_validador, + case + when + v.data >= date("{{ var('DATA_SUBSIDIO_V8_INICIO') }}") + and ( + ( + v.data < date("{{ var('DATA_SUBSIDIO_V12_INICIO') }}") + and ( + coalesce(tr.quantidade_transacao_riocard, 0) = 0 + or coalesce(eev.indicador_estado_equipamento_aberto, false) + = false + ) + ) + or ( + v.data >= date("{{ var('DATA_SUBSIDIO_V12_INICIO') }}") + and ( + ( + coalesce(tr.quantidade_transacao_riocard, 0) = 0 + and coalesce(t.quantidade_transacao, 0) = 0 + ) + or coalesce(eev.indicador_estado_equipamento_aberto, false) + = false + ) + ) + ) + and ve.status + in ("Licenciado com ar e não autuado", "Licenciado sem ar e não autuado") + and v.datetime_partida not between "2024-10-06 06:00:00" + and "2024-10-06 20:00:00" -- Eleição (2024-10-06) + then "Sem transação" + else ve.status + end as tipo_viagem, + v.sentido, + v.distancia_planejada, + coalesce(t.quantidade_transacao, 0) as quantidade_transacao, + coalesce(tr.quantidade_transacao_riocard, 0) as quantidade_transacao_riocard, + eev.percentual_estado_equipamento_aberto, + eev.indicador_estado_equipamento_aberto, + v.datetime_partida_com_tolerancia as datetime_partida_bilhetagem, + v.datetime_partida, + v.datetime_chegada, + current_datetime("America/Sao_Paulo") as datetime_ultima_atualizacao +from viagem_com_tolerancia as v +left join veiculos as ve using (data, id_veiculo) +left join transacao_contagem as t using (data, id_viagem) +left join transacao_riocard_contagem as tr using (data, id_viagem) +left join estado_equipamento_verificacao as eev using (data, id_viagem) diff --git a/queries/selectors.yml b/queries/selectors.yml index 27cd475f..8a24f258 100644 --- a/queries/selectors.yml +++ b/queries/selectors.yml @@ -24,6 +24,8 @@ selectors: value: models/financeiro - method: path value: models/dashboard_subsidio_sppo_v2 + - method: fqn + value: monitoramento_viagem_transacao - name: viagem_informada description: Materialização da tabela de viagens informadas @@ -86,3 +88,9 @@ selectors: value: integracao_invalida - method: fqn value: integracao_nao_realizada + + - name: monitoramento_subsidio + description: Materialização das tabelas utilizadas nos dashboards do subsidio + definition: + method: fqn + value: monitoramento_viagem_transacao \ No newline at end of file