Skip to content

Commit

Permalink
Viagens 2.0 - Ajustes na validação (#392)
Browse files Browse the repository at this point in the history
* ajustes viagens 2

* link pr

* adiciona tipo_os

* corrige filtro data gps em prd

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
pixuimpou and mergify[bot] authored Jan 22, 2025
1 parent a9b093b commit a19ab00
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 43 deletions.
2 changes: 1 addition & 1 deletion pipelines/treatment/monitoramento/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""
Flows de tratamento dos dados de monitoramento
DBT 2024-12-20
DBT 2025-01-22
"""

from copy import deepcopy
Expand Down
15 changes: 15 additions & 0 deletions queries/models/monitoramento/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
# Changelog - monitoramento

## [1.3.0] - 2025-01-22

### Alterado

- Adiciona as colunas `indicador_servico_planejado_gtfs`, `indicador_servico_planejado_os`, `indicador_servico_divergente` e `indicador_shape_invalido` no modelo `viagem_validacao.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/392)

- Adiciona a coluna `indicador_servico_divergente` no modelo `gps_segmento_viagem.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/392)

- Remove tratamento de sentido no modelo `viagem_informada_monitoramento.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/392)

### Corrigido

- Corrige trip_id inteiro nos modelos `staging_viagem_informada_rioonibus.sql` e `staging_viagem_informada_brt.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/392)

- Corrige filtro incremental no modelo `viagem_informada_monitoramento.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/392)

## [1.2.2] - 2025-01-08

Expand Down
38 changes: 27 additions & 11 deletions queries/models/monitoramento/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,23 +74,23 @@ models:
data_type: date
quote: true
- name: id_viagem
description: "{{ doc('id_viagem') }}"
description: "Código único identificador da viagem."
data_type: string
quote: true
- name: datetime_partida
description: "{{ doc('datetime_partida') }}"
description: "Horário de início da viagem."
data_type: datetime
quote: true
- name: datetime_chegada
description: "{{ doc('datetime_chegada') }}"
description: "Horário de fim da viagem."
data_type: datetime
quote: true
- name: modo
description: "{{ doc('modo') }}"
description: "Tipo de transporte (Ônibus, Van, BRT)"
data_type: string
quote: true
- name: id_veiculo
description: "{{ doc('id_veiculo') }}"
description: "Código identificador do veículo (número de ordem)."
data_type: string
quote: true
- name: trip_id
Expand All @@ -106,11 +106,11 @@ models:
data_type: string
quote: true
- name: servico
description: "{{ doc('servico') }}"
description: "Serviço realizado pelo veículo."
data_type: string
quote: true
- name: sentido
description: "{{ doc('sentido') }}"
description: "Sentido da linha"
data_type: string
quote: true
- name: quantidade_segmentos_verificados
Expand Down Expand Up @@ -142,22 +142,38 @@ models:
data_type: float64
quote: true
- name: tipo_dia
description: "{{ doc('tipo_dia') }}"
description: "Dia da semana - categorias: Dia Útil, Sábado, Domingo"
data_type: string
quote: true
- name: feed_version
description: "{{ doc('feed_version') }}"
description: "String que indica a versão atual do conjunto de dados GTFS."
data_type: string
quote: true
- name: feed_start_date
description: "Data inicial do feed (versão)."
data_type: date
quote: true
- name: versao
description: "{{ doc('versao') }}"
description: "Código de controle de versão do dado (SHA Github)"
data_type: string
quote: true
- name: datetime_ultima_atualizacao
description: "{{ doc('datetime_ultima_atualizacao') }}"
description: "Última atualização (GMT-3)."
data_type: datetime
quote: true
- name: indicador_servico_planejado_gtfs
description: Indica se o serviço entava planejado para o tipo dia no GTFS
data_type: boolean
quote: true
- name: indicador_servico_planejado_os
description: Indica se o serviço estava planejado para a faixa horária na OS (apenas para SPPO)
data_type: boolean
quote: true
- name: indicador_servico_divergente
description: Indica se o serviço indicado nos dados de gps estava diferente do serviço informado na viagem
data_type: boolean
quote: true
- name: indicador_shape_invalido
description: Indica se o shape existe no GTFS no feed vigente durante a data da viagem
data_type: boolean
quote: true
30 changes: 23 additions & 7 deletions queries/models/monitoramento/staging/gps_segmento_viagem.sql
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ with
gv.id_viagem,
gv.shape_id,
gv.geo_point_gps,
gv.servico_viagem,
gv.servico_gps,
c.feed_version,
c.feed_start_date
from {{ ref("gps_viagem") }} gv
Expand All @@ -68,15 +70,27 @@ with
where feed_start_date in ({{ gtfs_feeds | join(", ") }})
{% endif %}
),
gps_segmento as (
servico_divergente as (
select
id_viagem,
max(servico_viagem != servico_gps) as indicador_servico_divergente
from gps_viagem
group by 1
),
gps_servico_segmento as (
select g.id_viagem, g.shape_id, s.id_segmento, count(*) as quantidade_gps
from gps_viagem g
join
segmento s
on g.feed_version = s.feed_version
and g.shape_id = s.shape_id
and st_intersects(s.buffer, g.geo_point_gps)
group by 1, 2, 3
where g.servico_gps = g.servico_viagem
group by all
),
gps_segmento as (
select id_viagem, g.shape_id, g.id_segmento, g.quantidade_gps,
from gps_servico_segmento g
),
viagem as (
select
Expand Down Expand Up @@ -109,17 +123,17 @@ with
v.id_veiculo,
v.trip_id,
v.route_id,
shape_id,
v.shape_id,
s.id_segmento,
s.indicador_segmento_desconsiderado,
v.servico,
v.sentido,
v.service_ids,
v.tipo_dia,
feed_version,
feed_start_date
v.feed_version,
v.feed_start_date
from viagem v
join segmento s using (shape_id, feed_version, feed_start_date)
left join segmento s using (shape_id, feed_version, feed_start_date)
)
select
v.data,
Expand All @@ -132,10 +146,11 @@ select
v.route_id,
shape_id,
id_segmento,
v.indicador_segmento_desconsiderado,
v.servico,
v.sentido,
ifnull(g.quantidade_gps, 0) as quantidade_gps,
v.indicador_segmento_desconsiderado,
s.indicador_servico_divergente,
v.feed_version,
v.feed_start_date,
v.service_ids,
Expand All @@ -144,6 +159,7 @@ select
current_datetime("America/Sao_Paulo") as datetime_ultima_atualizacao
from viagem_segmento v
left join gps_segmento g using (id_viagem, shape_id, id_segmento)
left join servico_divergente s using (id_viagem)
{% if not is_incremental() %}
where v.data <= date_sub(current_date("America/Sao_Paulo"), interval 2 day)
{% endif %}
4 changes: 2 additions & 2 deletions queries/models/monitoramento/staging/gps_viagem.sql
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ select
g.timestamp_gps,
v.modo,
g.id_veiculo,
g.servico,
v.servico as servico_viagem,
g.servico as servico_gps,
v.sentido,
g.latitude,
g.longitude,
Expand All @@ -109,7 +110,6 @@ join
viagem v
on g.timestamp_gps between v.datetime_partida and v.datetime_chegada
and g.id_veiculo = v.id_veiculo
and g.servico = v.servico
and g.fornecedor = v.fonte_gps
{% if not is_incremental() %}
where v.data <= date_sub(current_date("America/Sao_Paulo"), interval 2 day)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ select
safe_cast(json_value(content, '$.sentido') as string) as sentido,
safe_cast(json_value(content, '$.servico') as string) as servico,
safe_cast(json_value(content, '$.shape_id') as string) as shape_id,
safe_cast(json_value(content, '$.trip_id') as integer) as trip_id
safe_cast(json_value(content, '$.trip_id') as string) as trip_id
from {{ source("source_sonda", "viagem_informada") }}
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ select
safe_cast(json_value(content, '$.sentido') as string) as sentido,
safe_cast(json_value(content, '$.servico') as string) as servico,
safe_cast(json_value(content, '$.shape_id') as string) as shape_id,
safe_cast(json_value(content, '$.trip_id') as integer) as trip_id
safe_cast(json_value(content, '$.trip_id') as string) as trip_id
from {{ source("source_rioonibus", "viagem_informada") }}
14 changes: 4 additions & 10 deletions queries/models/monitoramento/viagem_informada_monitoramento.sql
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,7 @@ with
route_id,
shape_id,
servico,
case
when sentido = 'I'
then 'Ida'
when sentido = 'V'
then 'Volta'
when sentido = 'C'
then 'Circular'
else sentido
end as sentido,
sentido,
fonte_gps,
datetime_processamento,
datetime_captura
Expand Down Expand Up @@ -154,7 +146,9 @@ with
calendario as (
select *
from {{ calendario }}
{% if is_incremental() %} where {{ incremental_filter }} {% endif %}
{% if is_incremental() %}
where data in ({{ partitions | join(", ") }})
{% endif %}
),
routes as (
select *
Expand Down
75 changes: 65 additions & 10 deletions queries/models/monitoramento/viagem_validacao.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,20 @@ with
shape_id,
servico,
sentido,
count(*) as quantidade_segmentos_verificados,
countif(id_segmento is not null) as quantidade_segmentos_verificados,
countif(quantidade_gps > 0) as quantidade_segmentos_validos,
max(indicador_servico_divergente) as indicador_servico_divergente,
max(id_segmento is null) as indicador_shape_invalido,
service_ids,
tipo_dia,
feed_version,
feed_start_date
from {{ ref("gps_segmento_viagem") }}
where
not indicador_segmento_desconsiderado
(
not indicador_segmento_desconsiderado
or indicador_segmento_desconsiderado is null
)
{% if is_incremental() %} and {{ incremental_filter }} {% endif %}
group by
data,
Expand Down Expand Up @@ -83,8 +88,11 @@ with
sentido,
quantidade_segmentos_verificados,
quantidade_segmentos_validos,
quantidade_segmentos_validos
/ quantidade_segmentos_verificados as indice_validacao,
safe_divide(
quantidade_segmentos_validos, quantidade_segmentos_verificados
) as indice_validacao,
indicador_servico_divergente,
indicador_shape_invalido,
service_ids,
tipo_dia,
feed_version,
Expand All @@ -104,17 +112,55 @@ with
{% endif %}
group by 1, 2, 3
),
servicos_planejados as (
servicos_planejados_gtfs as (
select
i.*,
(
select count(*)
from unnest(i.service_ids) as service_id
join unnest(t.service_ids) as service_id using (service_id)
)
> 0 as indicador_servico_planejado
> 0 as indicador_servico_planejado_gtfs
from indice i
left join trips t using (feed_start_date, feed_version, route_id)
),
viagem_planejada as (
select *
from {{ ref("viagem_planejada") }}
{# from `rj-smtr.projeto_subsidio_sppo.viagem_planejada` #}
{% if is_incremental() %} where {{ incremental_filter }} {% endif %}
qualify
row_number() over (
partition by data, servico, sentido, faixa_horaria_inicio
order by distancia_planejada desc
)
= 1
),
servicos_planejados_os as (
select
sp.*,
case
when
vp.distancia_total_planejada is not null
and vp.distancia_total_planejada > 0
then true
when
(
vp.distancia_total_planejada is not null
and vp.distancia_total_planejada <= 0
)
or (
vp.distancia_total_planejada is null and sp.modo = "Ônibus SPPO"
)
then false
end as indicador_servico_planejado_os
from servicos_planejados_gtfs sp
left join
viagem_planejada vp
on vp.servico = sp.servico
and vp.sentido = sp.sentido
and vp.data = sp.data
and sp.datetime_partida between faixa_horaria_inicio and faixa_horaria_fim
)
select
data,
Expand All @@ -132,13 +178,22 @@ select
quantidade_segmentos_validos,
indice_validacao,
indice_validacao >= {{ var("parametro_validacao") }} as indicador_trajeto_valido,
indicador_servico_planejado,
indice_validacao >= {{ var("parametro_validacao") }}
and indicador_servico_planejado as indicador_viagem_valida,
indicador_servico_planejado_gtfs,
indicador_servico_planejado_os,
indicador_servico_divergente,
indicador_shape_invalido,
(
shape_id is not null
and route_id is not null
and not indicador_shape_invalido
and indice_validacao >= {{ var("parametro_validacao") }}
and indicador_servico_planejado_gtfs
and ifnull(indicador_servico_planejado_os, true)
) as indicador_viagem_valida,
{{ var("parametro_validacao") }} as parametro_validacao,
tipo_dia,
feed_version,
feed_start_date,
'{{ var("version") }}' as versao,
current_datetime("America/Sao_Paulo") as datetime_ultima_atualizacao
from servicos_planejados
from servicos_planejados_os
Loading

0 comments on commit a19ab00

Please sign in to comment.