From a19ab009aa54cc936d1692693c8f3cce6c0e989c Mon Sep 17 00:00:00 2001 From: Rafael Carvalho Pinheiro <74972217+pixuimpou@users.noreply.github.com> Date: Wed, 22 Jan 2025 15:37:45 -0300 Subject: [PATCH] =?UTF-8?q?Viagens=202.0=20-=20Ajustes=20na=20valida=C3=A7?= =?UTF-8?q?=C3=A3o=20(#392)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ajustes viagens 2 * link pr * adiciona tipo_os * corrige filtro data gps em prd --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- pipelines/treatment/monitoramento/flows.py | 2 +- queries/models/monitoramento/CHANGELOG.md | 15 ++++ queries/models/monitoramento/schema.yml | 38 +++++++--- .../staging/gps_segmento_viagem.sql | 30 ++++++-- .../monitoramento/staging/gps_viagem.sql | 4 +- .../staging/staging_viagem_informada_brt.sql | 2 +- .../staging_viagem_informada_rioonibus.sql | 2 +- .../viagem_informada_monitoramento.sql | 14 +--- .../models/monitoramento/viagem_validacao.sql | 75 ++++++++++++++++--- .../staging/aux_calendario_manual.sql | 16 ++++ 10 files changed, 155 insertions(+), 43 deletions(-) diff --git a/pipelines/treatment/monitoramento/flows.py b/pipelines/treatment/monitoramento/flows.py index a6cb1a1c..16f6a76c 100644 --- a/pipelines/treatment/monitoramento/flows.py +++ b/pipelines/treatment/monitoramento/flows.py @@ -2,7 +2,7 @@ """ Flows de tratamento dos dados de monitoramento -DBT 2024-12-20 +DBT 2025-01-22 """ from copy import deepcopy diff --git a/queries/models/monitoramento/CHANGELOG.md b/queries/models/monitoramento/CHANGELOG.md index 00f69fff..affb9974 100644 --- a/queries/models/monitoramento/CHANGELOG.md +++ b/queries/models/monitoramento/CHANGELOG.md @@ -1,5 +1,20 @@ # Changelog - monitoramento +## [1.3.0] - 2025-01-22 + +### Alterado + +- Adiciona as colunas `indicador_servico_planejado_gtfs`, `indicador_servico_planejado_os`, `indicador_servico_divergente` e `indicador_shape_invalido` no modelo `viagem_validacao.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/392) + +- Adiciona a coluna `indicador_servico_divergente` no modelo `gps_segmento_viagem.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/392) + +- Remove tratamento de sentido no modelo `viagem_informada_monitoramento.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/392) + +### Corrigido + +- Corrige trip_id inteiro nos modelos `staging_viagem_informada_rioonibus.sql` e `staging_viagem_informada_brt.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/392) + +- Corrige filtro incremental no modelo `viagem_informada_monitoramento.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/392) ## [1.2.2] - 2025-01-08 diff --git a/queries/models/monitoramento/schema.yml b/queries/models/monitoramento/schema.yml index ae53a6ca..4ef98771 100644 --- a/queries/models/monitoramento/schema.yml +++ b/queries/models/monitoramento/schema.yml @@ -74,23 +74,23 @@ models: data_type: date quote: true - name: id_viagem - description: "{{ doc('id_viagem') }}" + description: "Código único identificador da viagem." data_type: string quote: true - name: datetime_partida - description: "{{ doc('datetime_partida') }}" + description: "Horário de início da viagem." data_type: datetime quote: true - name: datetime_chegada - description: "{{ doc('datetime_chegada') }}" + description: "Horário de fim da viagem." data_type: datetime quote: true - name: modo - description: "{{ doc('modo') }}" + description: "Tipo de transporte (Ônibus, Van, BRT)" data_type: string quote: true - name: id_veiculo - description: "{{ doc('id_veiculo') }}" + description: "Código identificador do veículo (número de ordem)." data_type: string quote: true - name: trip_id @@ -106,11 +106,11 @@ models: data_type: string quote: true - name: servico - description: "{{ doc('servico') }}" + description: "Serviço realizado pelo veículo." data_type: string quote: true - name: sentido - description: "{{ doc('sentido') }}" + description: "Sentido da linha" data_type: string quote: true - name: quantidade_segmentos_verificados @@ -142,11 +142,11 @@ models: data_type: float64 quote: true - name: tipo_dia - description: "{{ doc('tipo_dia') }}" + description: "Dia da semana - categorias: Dia Útil, Sábado, Domingo" data_type: string quote: true - name: feed_version - description: "{{ doc('feed_version') }}" + description: "String que indica a versão atual do conjunto de dados GTFS." data_type: string quote: true - name: feed_start_date @@ -154,10 +154,26 @@ models: data_type: date quote: true - name: versao - description: "{{ doc('versao') }}" + description: "Código de controle de versão do dado (SHA Github)" data_type: string quote: true - name: datetime_ultima_atualizacao - description: "{{ doc('datetime_ultima_atualizacao') }}" + description: "Última atualização (GMT-3)." data_type: datetime quote: true + - name: indicador_servico_planejado_gtfs + description: Indica se o serviço entava planejado para o tipo dia no GTFS + data_type: boolean + quote: true + - name: indicador_servico_planejado_os + description: Indica se o serviço estava planejado para a faixa horária na OS (apenas para SPPO) + data_type: boolean + quote: true + - name: indicador_servico_divergente + description: Indica se o serviço indicado nos dados de gps estava diferente do serviço informado na viagem + data_type: boolean + quote: true + - name: indicador_shape_invalido + description: Indica se o shape existe no GTFS no feed vigente durante a data da viagem + data_type: boolean + quote: true diff --git a/queries/models/monitoramento/staging/gps_segmento_viagem.sql b/queries/models/monitoramento/staging/gps_segmento_viagem.sql index 8fa8857e..63988c62 100644 --- a/queries/models/monitoramento/staging/gps_segmento_viagem.sql +++ b/queries/models/monitoramento/staging/gps_segmento_viagem.sql @@ -47,6 +47,8 @@ with gv.id_viagem, gv.shape_id, gv.geo_point_gps, + gv.servico_viagem, + gv.servico_gps, c.feed_version, c.feed_start_date from {{ ref("gps_viagem") }} gv @@ -68,7 +70,14 @@ with where feed_start_date in ({{ gtfs_feeds | join(", ") }}) {% endif %} ), - gps_segmento as ( + servico_divergente as ( + select + id_viagem, + max(servico_viagem != servico_gps) as indicador_servico_divergente + from gps_viagem + group by 1 + ), + gps_servico_segmento as ( select g.id_viagem, g.shape_id, s.id_segmento, count(*) as quantidade_gps from gps_viagem g join @@ -76,7 +85,12 @@ with on g.feed_version = s.feed_version and g.shape_id = s.shape_id and st_intersects(s.buffer, g.geo_point_gps) - group by 1, 2, 3 + where g.servico_gps = g.servico_viagem + group by all + ), + gps_segmento as ( + select id_viagem, g.shape_id, g.id_segmento, g.quantidade_gps, + from gps_servico_segmento g ), viagem as ( select @@ -109,17 +123,17 @@ with v.id_veiculo, v.trip_id, v.route_id, - shape_id, + v.shape_id, s.id_segmento, s.indicador_segmento_desconsiderado, v.servico, v.sentido, v.service_ids, v.tipo_dia, - feed_version, - feed_start_date + v.feed_version, + v.feed_start_date from viagem v - join segmento s using (shape_id, feed_version, feed_start_date) + left join segmento s using (shape_id, feed_version, feed_start_date) ) select v.data, @@ -132,10 +146,11 @@ select v.route_id, shape_id, id_segmento, - v.indicador_segmento_desconsiderado, v.servico, v.sentido, ifnull(g.quantidade_gps, 0) as quantidade_gps, + v.indicador_segmento_desconsiderado, + s.indicador_servico_divergente, v.feed_version, v.feed_start_date, v.service_ids, @@ -144,6 +159,7 @@ select current_datetime("America/Sao_Paulo") as datetime_ultima_atualizacao from viagem_segmento v left join gps_segmento g using (id_viagem, shape_id, id_segmento) +left join servico_divergente s using (id_viagem) {% if not is_incremental() %} where v.data <= date_sub(current_date("America/Sao_Paulo"), interval 2 day) {% endif %} diff --git a/queries/models/monitoramento/staging/gps_viagem.sql b/queries/models/monitoramento/staging/gps_viagem.sql index 33825780..5d01dd76 100644 --- a/queries/models/monitoramento/staging/gps_viagem.sql +++ b/queries/models/monitoramento/staging/gps_viagem.sql @@ -90,7 +90,8 @@ select g.timestamp_gps, v.modo, g.id_veiculo, - g.servico, + v.servico as servico_viagem, + g.servico as servico_gps, v.sentido, g.latitude, g.longitude, @@ -109,7 +110,6 @@ join viagem v on g.timestamp_gps between v.datetime_partida and v.datetime_chegada and g.id_veiculo = v.id_veiculo - and g.servico = v.servico and g.fornecedor = v.fonte_gps {% if not is_incremental() %} where v.data <= date_sub(current_date("America/Sao_Paulo"), interval 2 day) diff --git a/queries/models/monitoramento/staging/staging_viagem_informada_brt.sql b/queries/models/monitoramento/staging/staging_viagem_informada_brt.sql index 3674a82a..2a60d195 100644 --- a/queries/models/monitoramento/staging/staging_viagem_informada_brt.sql +++ b/queries/models/monitoramento/staging/staging_viagem_informada_brt.sql @@ -39,5 +39,5 @@ select safe_cast(json_value(content, '$.sentido') as string) as sentido, safe_cast(json_value(content, '$.servico') as string) as servico, safe_cast(json_value(content, '$.shape_id') as string) as shape_id, - safe_cast(json_value(content, '$.trip_id') as integer) as trip_id + safe_cast(json_value(content, '$.trip_id') as string) as trip_id from {{ source("source_sonda", "viagem_informada") }} diff --git a/queries/models/monitoramento/staging/staging_viagem_informada_rioonibus.sql b/queries/models/monitoramento/staging/staging_viagem_informada_rioonibus.sql index 6b7c92db..f22af07c 100644 --- a/queries/models/monitoramento/staging/staging_viagem_informada_rioonibus.sql +++ b/queries/models/monitoramento/staging/staging_viagem_informada_rioonibus.sql @@ -40,5 +40,5 @@ select safe_cast(json_value(content, '$.sentido') as string) as sentido, safe_cast(json_value(content, '$.servico') as string) as servico, safe_cast(json_value(content, '$.shape_id') as string) as shape_id, - safe_cast(json_value(content, '$.trip_id') as integer) as trip_id + safe_cast(json_value(content, '$.trip_id') as string) as trip_id from {{ source("source_rioonibus", "viagem_informada") }} diff --git a/queries/models/monitoramento/viagem_informada_monitoramento.sql b/queries/models/monitoramento/viagem_informada_monitoramento.sql index b90bf13e..742aca6e 100644 --- a/queries/models/monitoramento/viagem_informada_monitoramento.sql +++ b/queries/models/monitoramento/viagem_informada_monitoramento.sql @@ -112,15 +112,7 @@ with route_id, shape_id, servico, - case - when sentido = 'I' - then 'Ida' - when sentido = 'V' - then 'Volta' - when sentido = 'C' - then 'Circular' - else sentido - end as sentido, + sentido, fonte_gps, datetime_processamento, datetime_captura @@ -154,7 +146,9 @@ with calendario as ( select * from {{ calendario }} - {% if is_incremental() %} where {{ incremental_filter }} {% endif %} + {% if is_incremental() %} + where data in ({{ partitions | join(", ") }}) + {% endif %} ), routes as ( select * diff --git a/queries/models/monitoramento/viagem_validacao.sql b/queries/models/monitoramento/viagem_validacao.sql index 2b30b219..a83753a1 100644 --- a/queries/models/monitoramento/viagem_validacao.sql +++ b/queries/models/monitoramento/viagem_validacao.sql @@ -41,15 +41,20 @@ with shape_id, servico, sentido, - count(*) as quantidade_segmentos_verificados, + countif(id_segmento is not null) as quantidade_segmentos_verificados, countif(quantidade_gps > 0) as quantidade_segmentos_validos, + max(indicador_servico_divergente) as indicador_servico_divergente, + max(id_segmento is null) as indicador_shape_invalido, service_ids, tipo_dia, feed_version, feed_start_date from {{ ref("gps_segmento_viagem") }} where - not indicador_segmento_desconsiderado + ( + not indicador_segmento_desconsiderado + or indicador_segmento_desconsiderado is null + ) {% if is_incremental() %} and {{ incremental_filter }} {% endif %} group by data, @@ -83,8 +88,11 @@ with sentido, quantidade_segmentos_verificados, quantidade_segmentos_validos, - quantidade_segmentos_validos - / quantidade_segmentos_verificados as indice_validacao, + safe_divide( + quantidade_segmentos_validos, quantidade_segmentos_verificados + ) as indice_validacao, + indicador_servico_divergente, + indicador_shape_invalido, service_ids, tipo_dia, feed_version, @@ -104,7 +112,7 @@ with {% endif %} group by 1, 2, 3 ), - servicos_planejados as ( + servicos_planejados_gtfs as ( select i.*, ( @@ -112,9 +120,47 @@ with from unnest(i.service_ids) as service_id join unnest(t.service_ids) as service_id using (service_id) ) - > 0 as indicador_servico_planejado + > 0 as indicador_servico_planejado_gtfs from indice i left join trips t using (feed_start_date, feed_version, route_id) + ), + viagem_planejada as ( + select * + from {{ ref("viagem_planejada") }} + {# from `rj-smtr.projeto_subsidio_sppo.viagem_planejada` #} + {% if is_incremental() %} where {{ incremental_filter }} {% endif %} + qualify + row_number() over ( + partition by data, servico, sentido, faixa_horaria_inicio + order by distancia_planejada desc + ) + = 1 + ), + servicos_planejados_os as ( + select + sp.*, + case + when + vp.distancia_total_planejada is not null + and vp.distancia_total_planejada > 0 + then true + when + ( + vp.distancia_total_planejada is not null + and vp.distancia_total_planejada <= 0 + ) + or ( + vp.distancia_total_planejada is null and sp.modo = "Ônibus SPPO" + ) + then false + end as indicador_servico_planejado_os + from servicos_planejados_gtfs sp + left join + viagem_planejada vp + on vp.servico = sp.servico + and vp.sentido = sp.sentido + and vp.data = sp.data + and sp.datetime_partida between faixa_horaria_inicio and faixa_horaria_fim ) select data, @@ -132,13 +178,22 @@ select quantidade_segmentos_validos, indice_validacao, indice_validacao >= {{ var("parametro_validacao") }} as indicador_trajeto_valido, - indicador_servico_planejado, - indice_validacao >= {{ var("parametro_validacao") }} - and indicador_servico_planejado as indicador_viagem_valida, + indicador_servico_planejado_gtfs, + indicador_servico_planejado_os, + indicador_servico_divergente, + indicador_shape_invalido, + ( + shape_id is not null + and route_id is not null + and not indicador_shape_invalido + and indice_validacao >= {{ var("parametro_validacao") }} + and indicador_servico_planejado_gtfs + and ifnull(indicador_servico_planejado_os, true) + ) as indicador_viagem_valida, {{ var("parametro_validacao") }} as parametro_validacao, tipo_dia, feed_version, feed_start_date, '{{ var("version") }}' as versao, current_datetime("America/Sao_Paulo") as datetime_ultima_atualizacao -from servicos_planejados +from servicos_planejados_os diff --git a/queries/models/planejamento/staging/aux_calendario_manual.sql b/queries/models/planejamento/staging/aux_calendario_manual.sql index 8f0c0764..fa8f2f45 100644 --- a/queries/models/planejamento/staging/aux_calendario_manual.sql +++ b/queries/models/planejamento/staging/aux_calendario_manual.sql @@ -36,6 +36,22 @@ with then "Enem" when data = date(2024, 11, 24) then "Parada LGBTQI+" -- Processo.Rio MTR-DES-2024/70057 + when data between date(2024, 12, 07) and date(2024, 12, 08) + then "Extraordinária - Verão" -- Processo.Rio MTR-DES-2024/72800 + when data between date(2024, 12, 14) and date(2024, 12, 15) + then "Extraordinária - Verão" -- Processo.Rio MTR-DES-2024/74396 + when data = date(2024, 12, 23) + then "Fim de ano" -- Processo.Rio MTR-DES-2024/75723 + when data between date(2024, 12, 26) and date(2024, 12, 27) + then "Fim de ano" -- Processo.Rio MTR-DES-2024/75723 + when data = date(2024, 12, 30) + then "Fim de ano" -- Processo.Rio MTR-DES-2024/75723 + when data = date(2024, 12, 31) + then "Vespera de Reveillon" -- Processo.Rio MTR-DES-2024/76453 + when data = date(2025, 01, 01) + then "Reveillon" -- Processo.Rio MTR-DES-2024/76453 + when data between date(2025, 01, 02) and date(2025, 01, 03) + then "Fim de ano" -- Processo.Rio MTR-DES-2024/77046 end as tipo_os from unnest(