From 57f741d0bf79c14f7e2a3f0be43415073ef8f975 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Tue, 29 Oct 2024 20:28:43 -0300 Subject: [PATCH 01/10] Commit inicial --- queries/models/monitoramento/gps.sql | 140 ++++++++++++++++++ .../models/monitoramento/staging/aux_gps.sql | 17 +++ .../monitoramento/staging/aux_gps_filtrada | 77 ++++++++++ .../monitoramento/staging/aux_gps_parada.sql | 77 ++++++++++ .../staging/aux_gps_realocacao.sql | 76 ++++++++++ .../staging/aux_gps_trajeto_correto.sql | 95 ++++++++++++ .../staging/aux_gps_velocidade.sql | 91 ++++++++++++ .../monitoramento/staging/aux_realocacao.sql | 17 +++ 8 files changed, 590 insertions(+) create mode 100644 queries/models/monitoramento/gps.sql create mode 100644 queries/models/monitoramento/staging/aux_gps.sql create mode 100644 queries/models/monitoramento/staging/aux_gps_filtrada create mode 100644 queries/models/monitoramento/staging/aux_gps_parada.sql create mode 100644 queries/models/monitoramento/staging/aux_gps_realocacao.sql create mode 100644 queries/models/monitoramento/staging/aux_gps_trajeto_correto.sql create mode 100644 queries/models/monitoramento/staging/aux_gps_velocidade.sql create mode 100644 queries/models/monitoramento/staging/aux_realocacao.sql diff --git a/queries/models/monitoramento/gps.sql b/queries/models/monitoramento/gps.sql new file mode 100644 index 00000000..ae238e2e --- /dev/null +++ b/queries/models/monitoramento/gps.sql @@ -0,0 +1,140 @@ +{{ + config( + materialized='incremental', + partition_by={ + 'field':"data", + 'data_type':'date', + 'granularity': 'day' + }, + tags=['geolocalizacao'] + ) +}} +/* +Descrição: +Junção dos passos de tratamento, junta as informações extras que definimos a partir dos registros +capturados. +Para descrição detalhada de como cada coluna é calculada, consulte a documentação de cada uma das tabelas +utilizadas abaixo. +1. registros_filtrada: filtragem e tratamento básico dos dados brutos capturados. +2. aux_registros_velocidade: estimativa da velocidade de veículo a cada ponto registrado e identificação +do estado de movimento ('parado', 'andando') +3. aux_registros_parada: identifica veículos parados em terminais ou garagens conhecidas +4. aux_registros_flag_trajeto_correto: calcula intersecções das posições registradas para cada veículo +com o traçado da linha informada. +5. As junções (joins) são feitas sobre o id_veículo e a timestamp_gps. +*/ +WITH + registros as ( + -- 1. registros_filtrada + SELECT + id_veiculo, + timestamp_gps, + timestamp_captura, + velocidade, + linha, + latitude, + longitude, + FROM {{ ref('aux_gps_filtrada' ~ var('fonte_gps')) }} + {% if is_incremental() -%} + WHERE + data between DATE("{{var('date_range_start')}}") and DATE("{{var('date_range_end')}}") + AND timestamp_gps > "{{var('date_range_start')}}" and timestamp_gps <="{{var('date_range_end')}}" + {%- endif -%} + ), + velocidades AS ( + -- 2. velocidades + SELECT + id_veiculo, timestamp_gps, linha, velocidade, distancia, flag_em_movimento + FROM + {{ ref('aux_gps_velocidade' ~ var('fonte_gps')) }} + ), + paradas as ( + -- 3. paradas + SELECT + id_veiculo, timestamp_gps, linha, tipo_parada, + FROM {{ ref('aux_gps_parada' ~ var('fonte_gps')) }} + ), + flags AS ( + -- 4. flag_trajeto_correto + SELECT + id_veiculo, + timestamp_gps, + linha, + route_id, + flag_linha_existe_sigmob, + flag_trajeto_correto, + flag_trajeto_correto_hist + FROM + {{ ref('aux_gps_trajeto_correto' ~ var('fonte_gps')) }} + ) +-- 5. Junção final +SELECT + "SPPO" modo, + r.timestamp_gps, + date(r.timestamp_gps) data, + extract(time from r.timestamp_gps) hora, + r.id_veiculo, + r.linha as servico, + r.latitude, + r.longitude, + CASE + WHEN + flag_em_movimento IS true AND flag_trajeto_correto_hist is true + THEN true + ELSE false + END flag_em_operacao, + v.flag_em_movimento, + p.tipo_parada, + flag_linha_existe_sigmob, + flag_trajeto_correto, + flag_trajeto_correto_hist, + CASE + WHEN flag_em_movimento IS true AND flag_trajeto_correto_hist is true + THEN 'Em operação' + WHEN flag_em_movimento is true and flag_trajeto_correto_hist is false + THEN 'Operando fora trajeto' + WHEN flag_em_movimento is false + THEN + CASE + WHEN tipo_parada is not null + THEN concat("Parado ", tipo_parada) + ELSE + CASE + WHEN flag_trajeto_correto_hist is true + THEN 'Parado trajeto correto' + ELSE 'Parado fora trajeto' + END + END + END status, + r.velocidade velocidade_instantanea, + v.velocidade velocidade_estimada_10_min, + v.distancia, + "{{ var("version") }}" as versao +FROM + registros r + +JOIN + flags f +ON + r.id_veiculo = f.id_veiculo + AND r.timestamp_gps = f.timestamp_gps + AND r.linha = f.linha + +JOIN + velocidades v +ON + r.id_veiculo = v.id_veiculo + AND r.timestamp_gps = v.timestamp_gps + AND r.linha = v.linha + +JOIN + paradas p +ON + r.id_veiculo = p.id_veiculo + AND r.timestamp_gps = p.timestamp_gps + AND r.linha = p.linha +{% if is_incremental() -%} + WHERE + date(r.timestamp_gps) between DATE("{{var('date_range_start')}}") and DATE("{{var('date_range_end')}}") + AND r.timestamp_gps > "{{var('date_range_start')}}" and r.timestamp_gps <="{{var('date_range_end')}}" +{%- endif -%} diff --git a/queries/models/monitoramento/staging/aux_gps.sql b/queries/models/monitoramento/staging/aux_gps.sql new file mode 100644 index 00000000..c46fbcd8 --- /dev/null +++ b/queries/models/monitoramento/staging/aux_gps.sql @@ -0,0 +1,17 @@ +{{ config(alias=this.name ~ var('fonte_gps')) }} + +SELECT + SAFE_CAST(ordem AS STRING) ordem, + SAFE_CAST(REPLACE(latitude, ',', '.') AS FLOAT64) latitude, + SAFE_CAST(REPLACE(longitude, ',', '.') AS FLOAT64) longitude, + SAFE_CAST(DATETIME(TIMESTAMP(datahora), "America/Sao_Paulo") AS DATETIME) timestamp_gps, + SAFE_CAST(velocidade AS INT64) velocidade, + concat( + ifnull(REGEXP_EXTRACT(linha, r'[A-Z]+'), ""), + ifnull(REGEXP_EXTRACT(linha, r'[0-9]+'), "") + ) as linha, + SAFE_CAST(DATETIME(TIMESTAMP(timestamp_captura), "America/Sao_Paulo") AS DATETIME) timestamp_captura, + SAFE_CAST(data AS DATE) data, + SAFE_CAST(hora AS INT64) hora +from + {{var('sppo_registros_staging')}} as t \ No newline at end of file diff --git a/queries/models/monitoramento/staging/aux_gps_filtrada b/queries/models/monitoramento/staging/aux_gps_filtrada new file mode 100644 index 00000000..4542e0e2 --- /dev/null +++ b/queries/models/monitoramento/staging/aux_gps_filtrada @@ -0,0 +1,77 @@ +{{ + config( + materialized='ephemeral' + ) +}} + + /* +Descrição: +Filtragem e tratamento básico de registros de gps. +1. Filtra registros que estão fora de uma caixa que contém a área do município de Rio de Janeiro. +2. Filtra registros antigos. Remove registros que tem diferença maior que 1 minuto entre o timestamp_captura e timestamp_gps. +3. Muda o nome de variáveis para o padrão do projeto. + - id_veiculo --> ordem +*/ +WITH +box AS ( + /*1. Geometria de caixa que contém a área do município de Rio de Janeiro.*/ + SELECT + * + FROM + {{ var('limites_caixa') }} +), +gps AS ( + /*2. Filtra registros antigos. Remove registros que tem diferença maior que 1 minuto entre o timestamp_captura e timestamp_gps.*/ + SELECT + *, + ST_GEOGPOINT(longitude, latitude) posicao_veiculo_geo + FROM + {{ ref('aux_gps' ~ var('fonte_gps')) }} + WHERE + data between DATE("{{var('date_range_start')}}") and DATE("{{var('date_range_end')}}") + {% if is_incremental() -%} + AND timestamp_gps > "{{var('date_range_start')}}" and timestamp_gps <="{{var('date_range_end')}}" + {%- endif -%} +), +realocacao as ( + SELECT + g.* except(linha), + coalesce(r.servico_realocado, g.linha) as linha + FROM + gps g + LEFT JOIN + {{ ref('aux_gps_realocacao' ~ var('fonte_gps')) }} r + ON + g.ordem = r.id_veiculo + and g.timestamp_gps = r.timestamp_gps +), +filtrada AS ( + /*1,2, e 3. Muda o nome de variáveis para o padrão do projeto.*/ + SELECT + ordem AS id_veiculo, + latitude, + longitude, + posicao_veiculo_geo, + velocidade, + linha, + timestamp_gps, + timestamp_captura, + data, + hora, + row_number() over (partition by ordem, timestamp_gps, linha) rn + FROM + realocacao + WHERE + ST_INTERSECTSBOX(posicao_veiculo_geo, + ( SELECT min_longitude FROM box), + ( SELECT min_latitude FROM box), + ( SELECT max_longitude FROM box), + ( SELECT max_latitude FROM box)) + ) +SELECT + * except(rn), + "{{ var("version") }}" as versao +FROM + filtrada +WHERE + rn = 1 \ No newline at end of file diff --git a/queries/models/monitoramento/staging/aux_gps_parada.sql b/queries/models/monitoramento/staging/aux_gps_parada.sql new file mode 100644 index 00000000..8ab66de9 --- /dev/null +++ b/queries/models/monitoramento/staging/aux_gps_parada.sql @@ -0,0 +1,77 @@ +{{ + config( + materialized='ephemeral' + ) +}} + +/* +Descrição: +Identifica veículos parados em terminais ou garagens conhecidas. +1. Selecionamos os terminais conhecidos e uma geometria do tipo polígono (Polygon) que contém buracos nas +localizações das garagens. +2. Calculamos as distâncias do veículos em relação aos terminais conhecidos. Definimos aqui a coluna 'nrow', +que identifica qual o terminal que está mais próximo do ponto informado. No passo final, recuperamos apenas +os dados com nrow = 1 (menor distância em relação à posição do veículo) +3. Definimos uma distancia_limiar_parada. Caso o veículo esteja a uma distância menor que este valor de uma +parada, será considerado como parado no terminal com menor distancia. +4. Caso o veiculo não esteja intersectando o polígono das garagens, ele será considerado como parado dentro +de uma garagem (o polígono é vazado nas garagens, a não intersecção implica em estar dentro de um dos 'buracos'). +*/ +WITH + terminais as ( + -- 1. Selecionamos terminais, criando uma geometria de ponto para cada. + select + ST_GEOGPOINT(longitude, latitude) ponto_parada, nome_terminal nome_parada, 'terminal' tipo_parada + from {{ var('sppo_terminais') }} + ), + garagem_polygon AS ( + -- 1. Selecionamos o polígono das garagens. + SELECT ST_GEOGFROMTEXT(WKT,make_valid => true) AS poly + FROM {{ var('polygon_garagem') }} + ), + distancia AS ( + --2. Calculamos as distâncias e definimos nrow + SELECT + id_veiculo, + timestamp_gps, + data, + linha, + posicao_veiculo_geo, + nome_parada, + tipo_parada, + ROUND(ST_DISTANCE(posicao_veiculo_geo, ponto_parada), 1) distancia_parada, + ROW_NUMBER() OVER (PARTITION BY timestamp_gps, id_veiculo, linha ORDER BY ST_DISTANCE(posicao_veiculo_geo, ponto_parada)) nrow + FROM terminais p + JOIN ( + SELECT + id_veiculo, + timestamp_gps, + data, + linha, + posicao_veiculo_geo + FROM + {{ ref('aux_gps_filtrada' ~ var('fonte_gps')) }} + {%if not flags.FULL_REFRESH %} + WHERE + data between DATE("{{var('date_range_start')}}") and DATE("{{var('date_range_end')}}") + AND timestamp_gps > "{{var('date_range_start')}}" and timestamp_gps <="{{var('date_range_end')}}" + {% endif %} + ) r + on 1=1 + ) +SELECT + data, + id_veiculo, + timestamp_gps, + linha, + /* + 3. e 4. Identificamos o status do veículo como 'terminal', 'garagem' (para os veículos parados) ou + null (para os veículos mais distantes de uma parada que o limiar definido) + */ + case + when distancia_parada < {{ var('distancia_limiar_parada') }} then tipo_parada + when not ST_INTERSECTS(posicao_veiculo_geo, (SELECT poly FROM garagem_polygon)) then 'garagem' + else null + end tipo_parada, +FROM distancia +WHERE nrow = 1 \ No newline at end of file diff --git a/queries/models/monitoramento/staging/aux_gps_realocacao.sql b/queries/models/monitoramento/staging/aux_gps_realocacao.sql new file mode 100644 index 00000000..e4c407d4 --- /dev/null +++ b/queries/models/monitoramento/staging/aux_gps_realocacao.sql @@ -0,0 +1,76 @@ +{% if var("fifteen_minutes") == "_15_minutos" %} +{{ + config( + materialized='ephemeral', + ) +}} +{% else %} +{{ + config( + materialized='incremental', + partition_by={ + "field":"data", + "data_type": "date", + "granularity":"day" + }, + alias=this.name ~ var('fonte_gps') + ) +}} +{% endif %} + +-- 1. Filtra realocações válidas dentro do intervalo de GPS avaliado +with realocacao as ( + select + * except(datetime_saida), + case + when datetime_saida is null then datetime_operacao + else datetime_saida + end as datetime_saida, + from + {{ ref('aux_realocacao'~ var('fonte_gps')) }} + where + -- Realocação deve acontecer após o registro de GPS e até 1 hora depois + datetime_diff(datetime_operacao, datetime_entrada, minute) between 0 and 60 + and data between DATE("{{var('date_range_start')}}") + and DATE(datetime_add("{{var('date_range_end')}}", interval 1 hour)) + and (datetime_saida >= datetime("{{var('date_range_start')}}") or datetime_operacao >= datetime("{{var('date_range_start')}}")) +), +-- 2. Altera registros de GPS com servicos realocados +gps as ( + select + ordem, + timestamp_gps, + linha, + data, + hora + from {{ ref('aux_gps' ~ var('fonte_gps') }} + where + data between DATE("{{var('date_range_start')}}") and DATE("{{var('date_range_end')}}") + and timestamp_gps > "{{var('date_range_start')}}" and timestamp_gps <= "{{var('date_range_end')}}" +), +combinacao as ( + select + r.id_veiculo, + g.timestamp_gps, + g.linha as servico_gps, + r.servico as servico_realocado, + r.datetime_operacao as datetime_realocacao, + g.data, + g.hora + from gps g + inner join realocacao r + on + g.ordem = r.id_veiculo + and g.linha != r.servico + and g.timestamp_gps between r.datetime_entrada and r.datetime_saida +) +-- Filtra realocacao mais recente para cada timestamp +select + * except(rn) +from ( + select + *, + row_number() over (partition by id_veiculo, timestamp_gps order by datetime_realocacao desc) as rn + from combinacao +) +where rn = 1 \ No newline at end of file diff --git a/queries/models/monitoramento/staging/aux_gps_trajeto_correto.sql b/queries/models/monitoramento/staging/aux_gps_trajeto_correto.sql new file mode 100644 index 00000000..16e31889 --- /dev/null +++ b/queries/models/monitoramento/staging/aux_gps_trajeto_correto.sql @@ -0,0 +1,95 @@ +{{ + config( + materialized='ephemeral' + ) +}} + +/* +Descrição: +Calcula se o veículo está dentro do trajeto correto dado o traçado (shape) cadastrado no SIGMOB em relação à linha que está sendo +transmitida. +1. Calcula as intersecções definindo um 'buffer', utilizado por st_dwithin para identificar se o ponto está à uma +distância menor ou igual ao tamanho do buffer em relação ao traçado definido no SIGMOB. +2. Calcula um histórico de intersecções nos ultimos 10 minutos de registros de cada carro. Definimos que o carro é +considerado fora do trajeto definido se a cada 10 minutos, ele não esteve dentro do traçado planejado pelo menos uma +vez. +3. Identifica se a linha informada no registro capturado existe nas definições presentes no SIGMOB. +4. Definimos em outra tabela uma 'data_versao_efetiva', esse passo serve tanto para definir qual versão do SIGMOB utilizaremos em +caso de falha na captura, quanto para definir qual versão será utilizada para o cálculo retroativo do histórico de registros que temos. +5. Como não conseguimos identificar o itinerário que o carro está realizando, no passo counts, os resultados de +intersecções são dobrados, devido ao fato de cada linha apresentar dois itinerários possíveis (ida/volta). Portanto, +ao final, realizamos uma agregação LOGICAL_OR que é true caso o carro esteja dentro do traçado de algum dos itinerários +possíveis para a linha informada. +*/ +WITH + registros AS ( + SELECT + id_veiculo, + linha, + latitude, + longitude, + data, + posicao_veiculo_geo, + timestamp_gps + FROM + {{ ref('aux_gps_filtrada' ~ var('fonte_gps')) }} r + {% if not flags.FULL_REFRESH -%} + WHERE + data between DATE("{{var('date_range_start')}}") and DATE("{{var('date_range_end')}}") + AND timestamp_gps > "{{var('date_range_start')}}" and timestamp_gps <="{{var('date_range_end')}}" + {%- endif -%} + ), + intersec AS ( + SELECT + r.*, + s.data_versao, + s.linha_gtfs, + s.route_id, + -- 1. Buffer e intersecções + CASE + WHEN st_dwithin(shape, posicao_veiculo_geo, {{ var('tamanho_buffer_metros') }}) THEN TRUE + ELSE FALSE + END AS flag_trajeto_correto, + -- 2. Histórico de intersecções nos últimos 10 minutos a partir da timestamp_gps atual + CASE + WHEN + COUNT(CASE WHEN st_dwithin(shape, posicao_veiculo_geo, {{ var('tamanho_buffer_metros') }}) THEN 1 END) + OVER (PARTITION BY id_veiculo + ORDER BY UNIX_SECONDS(TIMESTAMP(timestamp_gps)) + RANGE BETWEEN {{ var('intervalo_max_desvio_segundos') }} PRECEDING AND CURRENT ROW) >= 1 + THEN True + ELSE False + END AS flag_trajeto_correto_hist, + -- 3. Identificação de cadastro da linha no SIGMOB + CASE WHEN s.linha_gtfs IS NULL THEN False ELSE True END AS flag_linha_existe_sigmob, + -- 4. Join com data_versao_efetiva para definição de quais shapes serão considerados no cálculo das flags + FROM registros r + LEFT JOIN ( + SELECT * + FROM {{ ref('shapes_geom') }} + WHERE id_modal_smtr in ({{ var('sppo_id_modal_smtr')|join(', ') }}) + AND data_versao = "{{ var('versao_fixa_sigmob')}}" + ) s + ON + r.linha = s.linha_gtfs + ) + -- 5. Agregação com LOGICAL_OR para evitar duplicação de registros + SELECT + id_veiculo, + linha, + linha_gtfs, + route_id, + data, + timestamp_gps, + LOGICAL_OR(flag_trajeto_correto) AS flag_trajeto_correto, + LOGICAL_OR(flag_trajeto_correto_hist) AS flag_trajeto_correto_hist, + LOGICAL_OR(flag_linha_existe_sigmob) AS flag_linha_existe_sigmob, + FROM intersec i + GROUP BY + id_veiculo, + linha, + linha_gtfs, + route_id, + data, + data_versao, + timestamp_gps diff --git a/queries/models/monitoramento/staging/aux_gps_velocidade.sql b/queries/models/monitoramento/staging/aux_gps_velocidade.sql new file mode 100644 index 00000000..a3eaf64a --- /dev/null +++ b/queries/models/monitoramento/staging/aux_gps_velocidade.sql @@ -0,0 +1,91 @@ +{{ + config( + materialized='ephemeral' + ) +}} +/* +Descrição: +Estimativa das velocidades dos veículos nos últimos 10 minutos contados a partir da timestamp_gps atual. +Essa metodologia serve para determinar quais carros estão em movimento e quais estão parados. +1. Calculamos a velocidade do veículo no último trecho de 10 minutos de operação. +A implementação utiliza a função 'first_value' com uma janela (cláusula 'over') de até 10 minutos anteriores à +timestamp_gps atual e calcula a distância do ponto mais antigo (o first_value na janela) ao ponto atual (posicao_veiculo_geo). +Dividimos essa distância pela diferença de tempo entre a timestamp_gps atual e a timestamp_gps do ponto mais +antigo da janela (o qual recuperamos novamente com o uso de first_value). +Esta diferença de tempo (datetime_diff) é calculada em segundos, portanto multiplicamos o resultado da divisão por um fator +3.6 para que a velocidade esteja em quilômetros por hora. O resultado final é arrendondado sem casas decimais. +Por fim, cobrimos esse cálculo com a função 'if_null' e retornamos zero para a velocidade em casos onde a divisão retornaria +um valor nulo. +2. Após o calculo da velocidade, definimos a coluna 'status_movimento'. Veículos abaixo da 'velocidade_limiar_parado', são +considerados como 'parado'. Caso contrário, são considerados 'andando' +*/ +with + t_velocidade as ( + select + data, + id_veiculo, + timestamp_gps, + linha, + ST_DISTANCE( + posicao_veiculo_geo, + lag(posicao_veiculo_geo) over ( + partition by id_veiculo, linha + order by timestamp_gps) + ) distancia, + IFNULL( + SAFE_DIVIDE( + ST_DISTANCE( + posicao_veiculo_geo, + lag(posicao_veiculo_geo) over ( + partition by id_veiculo, linha + order by timestamp_gps) + ), + DATETIME_DIFF( + timestamp_gps, + lag(timestamp_gps) over ( + partition by id_veiculo, linha + order by timestamp_gps), + SECOND + )), + 0 + ) * 3.6 velocidade + FROM {{ ref("aux_gps_filtrada" ~ var('fonte_gps')) }} + {%if not flags.FULL_REFRESH -%} + WHERE + data between DATE("{{var('date_range_start')}}") and DATE("{{var('date_range_end')}}") + AND timestamp_gps > "{{var('date_range_start')}}" and timestamp_gps <="{{var('date_range_end')}}" + {%- endif -%} + ), + medias as ( + select + data, + id_veiculo, + timestamp_gps, + linha, + distancia, + velocidade, # velocidade do pontual + AVG(velocidade) OVER ( + PARTITION BY id_veiculo, linha + ORDER BY unix_seconds(timestamp(timestamp_gps)) + RANGE BETWEEN {{ var('janela_movel_velocidade') }} PRECEDING AND CURRENT ROW + ) velocidade_media # velocidade com média móvel + from t_velocidade + ) +SELECT + timestamp_gps, + data, + id_veiculo, + linha, + distancia, + ROUND( + CASE WHEN velocidade_media > {{ var('velocidade_maxima') }} + THEN {{ var('velocidade_maxima') }} + ELSE velocidade_media + END, + 1) as velocidade, + -- 2. Determinação do estado de movimento do veículo. + case + when velocidade_media < {{ var('velocidade_limiar_parado') }} then false + else true + end flag_em_movimento, +FROM medias \ No newline at end of file diff --git a/queries/models/monitoramento/staging/aux_realocacao.sql b/queries/models/monitoramento/staging/aux_realocacao.sql new file mode 100644 index 00000000..148a47af --- /dev/null +++ b/queries/models/monitoramento/staging/aux_realocacao.sql @@ -0,0 +1,17 @@ +{{ config(alias=this.name ~ var('fonte_gps')) }} + +SELECT + SAFE_CAST(id_veiculo AS STRING) id_veiculo, + SAFE_CAST(DATETIME(TIMESTAMP(datetime_operacao), "America/Sao_Paulo") AS DATETIME) datetime_operacao, + concat( + ifnull(REGEXP_EXTRACT(servico, r'[A-Z]+'), ""), + ifnull(REGEXP_EXTRACT(servico, r'[0-9]+'), "") + ) as servico, + SAFE_CAST(DATETIME(TIMESTAMP(datetime_entrada), "America/Sao_Paulo") AS DATETIME) as datetime_entrada, + SAFE_CAST(DATETIME(TIMESTAMP(datetime_saida), "America/Sao_Paulo") AS DATETIME) as datetime_saida, + SAFE_CAST(DATETIME(TIMESTAMP(timestamp_processamento), "America/Sao_Paulo") AS DATETIME) as timestamp_processamento, + SAFE_CAST(DATETIME(TIMESTAMP(timestamp_captura), "America/Sao_Paulo") AS DATETIME) as timestamp_captura, + data, + hora +FROM + {{var('sppo_realocacao_staging')}} as t \ No newline at end of file From ab921227aa728411ca09e957ad1b9ded8985112e Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 31 Oct 2024 15:40:58 -0300 Subject: [PATCH 02/10] add variaveis modo_gps e fonte_gps --- queries/dbt_project.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/queries/dbt_project.yml b/queries/dbt_project.yml index bd594b10..9fdc4079 100644 --- a/queries/dbt_project.yml +++ b/queries/dbt_project.yml @@ -186,6 +186,9 @@ vars: ### Encontro de Contas ### encontro_contas_modo: "" + modo_gps: "" + fonte_gps: "" + tests: rj_smtr: where: "DATA BETWEEN DATE('__date_range_start__') AND DATE('__date_range_end__')" From 764b3e4e189c4676cabe717ee5d01235522a0faa Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 31 Oct 2024 15:41:12 -0300 Subject: [PATCH 03/10] atualiza modelos --- queries/models/monitoramento/gps.sql | 240 ++++++++---------- .../models/monitoramento/staging/aux_gps.sql | 32 +-- .../monitoramento/staging/aux_gps_filtrada | 77 ------ .../staging/aux_gps_filtrada.sql | 54 ++++ .../monitoramento/staging/aux_gps_parada.sql | 148 +++++------ .../staging/aux_gps_realocacao.sql | 134 +++++----- .../staging/aux_gps_trajeto_correto.sql | 176 ++++++------- .../staging/aux_gps_velocidade.sql | 145 +++++------ .../monitoramento/staging/aux_realocacao.sql | 40 +-- 9 files changed, 492 insertions(+), 554 deletions(-) delete mode 100644 queries/models/monitoramento/staging/aux_gps_filtrada create mode 100644 queries/models/monitoramento/staging/aux_gps_filtrada.sql diff --git a/queries/models/monitoramento/gps.sql b/queries/models/monitoramento/gps.sql index ae238e2e..6c405b39 100644 --- a/queries/models/monitoramento/gps.sql +++ b/queries/models/monitoramento/gps.sql @@ -1,140 +1,116 @@ {{ - config( - materialized='incremental', - partition_by={ - 'field':"data", - 'data_type':'date', - 'granularity': 'day' - }, - tags=['geolocalizacao'] - ) + config( + materialized="incremental", + partition_by={"field": "data", "data_type": "date", "granularity": "day"}, + tags=["geolocalizacao"], + alias=this.name ~ var("modo_gps") ~ var("fonte_gps"), + ) }} -/* -Descrição: -Junção dos passos de tratamento, junta as informações extras que definimos a partir dos registros -capturados. -Para descrição detalhada de como cada coluna é calculada, consulte a documentação de cada uma das tabelas -utilizadas abaixo. -1. registros_filtrada: filtragem e tratamento básico dos dados brutos capturados. -2. aux_registros_velocidade: estimativa da velocidade de veículo a cada ponto registrado e identificação -do estado de movimento ('parado', 'andando') -3. aux_registros_parada: identifica veículos parados em terminais ou garagens conhecidas -4. aux_registros_flag_trajeto_correto: calcula intersecções das posições registradas para cada veículo -com o traçado da linha informada. -5. As junções (joins) são feitas sobre o id_veículo e a timestamp_gps. -*/ -WITH - registros as ( - -- 1. registros_filtrada - SELECT - id_veiculo, - timestamp_gps, - timestamp_captura, - velocidade, - linha, - latitude, - longitude, - FROM {{ ref('aux_gps_filtrada' ~ var('fonte_gps')) }} - {% if is_incremental() -%} - WHERE - data between DATE("{{var('date_range_start')}}") and DATE("{{var('date_range_end')}}") - AND timestamp_gps > "{{var('date_range_start')}}" and timestamp_gps <="{{var('date_range_end')}}" - {%- endif -%} - ), - velocidades AS ( - -- 2. velocidades - SELECT - id_veiculo, timestamp_gps, linha, velocidade, distancia, flag_em_movimento - FROM - {{ ref('aux_gps_velocidade' ~ var('fonte_gps')) }} - ), - paradas as ( - -- 3. paradas - SELECT - id_veiculo, timestamp_gps, linha, tipo_parada, - FROM {{ ref('aux_gps_parada' ~ var('fonte_gps')) }} - ), - flags AS ( - -- 4. flag_trajeto_correto - SELECT - id_veiculo, - timestamp_gps, - linha, - route_id, - flag_linha_existe_sigmob, - flag_trajeto_correto, - flag_trajeto_correto_hist - FROM - {{ ref('aux_gps_trajeto_correto' ~ var('fonte_gps')) }} - ) + +with + registros as ( + -- 1. registros_filtrada + select + id_veiculo, + datetime_gps, + datetime_captura, + velocidade, + servico, + latitude, + longitude, + from {{ ref("aux_gps_filtrada") }} + {% if is_incremental() -%} + where + data between date("{{var('date_range_start')}}") and date( + "{{var('date_range_end')}}" + ) + and datetime_gps > "{{var('date_range_start')}}" + and datetime_gps <= "{{var('date_range_end')}}" + {%- endif -%} + ), + velocidades as ( + -- 2. velocidades + select + id_veiculo, + datetime_gps, + servico, + velocidade, + distancia, + indicador_em_movimento + from {{ ref("aux_gps_velocidade") }} + ), + paradas as ( + -- 3. paradas + select id_veiculo, datetime_gps, servico, tipo_parada, + from {{ ref("aux_gps_parada") }} + ), + indicadores as ( + -- 4. indicador_trajeto_correto + select + id_veiculo, + datetime_gps, + servico, + route_id, + indicador_linha_existe_sigmob, + indicador_trajeto_correto, + indicador_trajeto_correto + from {{ ref("aux_gps_trajeto_correto") }} + ) -- 5. Junção final -SELECT - "SPPO" modo, - r.timestamp_gps, - date(r.timestamp_gps) data, - extract(time from r.timestamp_gps) hora, - r.id_veiculo, - r.linha as servico, - r.latitude, - r.longitude, - CASE - WHEN - flag_em_movimento IS true AND flag_trajeto_correto_hist is true - THEN true - ELSE false - END flag_em_operacao, - v.flag_em_movimento, - p.tipo_parada, - flag_linha_existe_sigmob, - flag_trajeto_correto, - flag_trajeto_correto_hist, - CASE - WHEN flag_em_movimento IS true AND flag_trajeto_correto_hist is true - THEN 'Em operação' - WHEN flag_em_movimento is true and flag_trajeto_correto_hist is false - THEN 'Operando fora trajeto' - WHEN flag_em_movimento is false - THEN - CASE - WHEN tipo_parada is not null - THEN concat("Parado ", tipo_parada) - ELSE - CASE - WHEN flag_trajeto_correto_hist is true - THEN 'Parado trajeto correto' - ELSE 'Parado fora trajeto' - END - END - END status, - r.velocidade velocidade_instantanea, - v.velocidade velocidade_estimada_10_min, - v.distancia, - "{{ var("version") }}" as versao -FROM - registros r +select + date(r.datetime_gps) data, + r.datetime_gps, + r.id_veiculo, + r.servico, + r.latitude, + r.longitude, + case + when indicador_em_movimento is true and indicador_trajeto_correto is true + then 'Em operação' + when indicador_em_movimento is true and indicador_trajeto_correto is false + then 'Operando fora trajeto' + when indicador_em_movimento is false + then + case + when tipo_parada is not null + then concat("Parado ", tipo_parada) + else + case + when indicador_trajeto_correto is true + then 'Parado trajeto correto' + else 'Parado fora trajeto' + end + end + end status, + r.velocidade as velocidade_instantanea, + v.velocidade as velocidade_estimada_10_min, + v.distancia, + '{{ var("version") }}' as versao, + current_datetime("America/Sao_Paulo") as datetime_ultima_atualizacao +from registros r -JOIN - flags f -ON - r.id_veiculo = f.id_veiculo - AND r.timestamp_gps = f.timestamp_gps - AND r.linha = f.linha +join + indicadores i + on r.id_veiculo = i.id_veiculo + and r.datetime_gps = i.datetime_gps + and r.servico = i.servico -JOIN - velocidades v -ON - r.id_veiculo = v.id_veiculo - AND r.timestamp_gps = v.timestamp_gps - AND r.linha = v.linha +join + velocidades v + on r.id_veiculo = v.id_veiculo + and r.datetime_gps = v.datetime_gps + and r.servico = v.servico -JOIN - paradas p -ON - r.id_veiculo = p.id_veiculo - AND r.timestamp_gps = p.timestamp_gps - AND r.linha = p.linha +join + paradas p + on r.id_veiculo = p.id_veiculo + and r.datetime_gps = p.datetime_gps + and r.servico = p.servico {% if is_incremental() -%} - WHERE - date(r.timestamp_gps) between DATE("{{var('date_range_start')}}") and DATE("{{var('date_range_end')}}") - AND r.timestamp_gps > "{{var('date_range_start')}}" and r.timestamp_gps <="{{var('date_range_end')}}" + where + date(r.datetime_gps) between date("{{var('date_range_start')}}") and date( + "{{var('date_range_end')}}" + ) + and r.datetime_gps > "{{var('date_range_start')}}" + and r.datetime_gps <= "{{var('date_range_end')}}" {%- endif -%} diff --git a/queries/models/monitoramento/staging/aux_gps.sql b/queries/models/monitoramento/staging/aux_gps.sql index c46fbcd8..a38e8756 100644 --- a/queries/models/monitoramento/staging/aux_gps.sql +++ b/queries/models/monitoramento/staging/aux_gps.sql @@ -1,17 +1,19 @@ -{{ config(alias=this.name ~ var('fonte_gps')) }} +{{ config(alias=this.name ~ var("fonte_gps")) }} -SELECT - SAFE_CAST(ordem AS STRING) ordem, - SAFE_CAST(REPLACE(latitude, ',', '.') AS FLOAT64) latitude, - SAFE_CAST(REPLACE(longitude, ',', '.') AS FLOAT64) longitude, - SAFE_CAST(DATETIME(TIMESTAMP(datahora), "America/Sao_Paulo") AS DATETIME) timestamp_gps, - SAFE_CAST(velocidade AS INT64) velocidade, +select + data, + safe_cast( + datetime(timestamp(datahora), "America/Sao_Paulo") as datetime + ) datetime_gps, + safe_cast(ordem as string) id_veiculo, concat( - ifnull(REGEXP_EXTRACT(linha, r'[A-Z]+'), ""), - ifnull(REGEXP_EXTRACT(linha, r'[0-9]+'), "") - ) as linha, - SAFE_CAST(DATETIME(TIMESTAMP(timestamp_captura), "America/Sao_Paulo") AS DATETIME) timestamp_captura, - SAFE_CAST(data AS DATE) data, - SAFE_CAST(hora AS INT64) hora -from - {{var('sppo_registros_staging')}} as t \ No newline at end of file + ifnull(regexp_extract(linha, r'[A-Z]+'), ""), + ifnull(regexp_extract(linha, r'[0-9]+'), "") + ) as servico, + safe_cast(replace(latitude, ',', '.') as float64) latitude, + safe_cast(replace(longitude, ',', '.') as float64) longitude, + safe_cast( + datetime(timestamp(timestamp_captura), "America/Sao_Paulo") as datetime + ) datetime_captura, + safe_cast(velocidade as int64) velocidade +from {{ var("sppo_registros_staging") }} diff --git a/queries/models/monitoramento/staging/aux_gps_filtrada b/queries/models/monitoramento/staging/aux_gps_filtrada deleted file mode 100644 index 4542e0e2..00000000 --- a/queries/models/monitoramento/staging/aux_gps_filtrada +++ /dev/null @@ -1,77 +0,0 @@ -{{ - config( - materialized='ephemeral' - ) -}} - - /* -Descrição: -Filtragem e tratamento básico de registros de gps. -1. Filtra registros que estão fora de uma caixa que contém a área do município de Rio de Janeiro. -2. Filtra registros antigos. Remove registros que tem diferença maior que 1 minuto entre o timestamp_captura e timestamp_gps. -3. Muda o nome de variáveis para o padrão do projeto. - - id_veiculo --> ordem -*/ -WITH -box AS ( - /*1. Geometria de caixa que contém a área do município de Rio de Janeiro.*/ - SELECT - * - FROM - {{ var('limites_caixa') }} -), -gps AS ( - /*2. Filtra registros antigos. Remove registros que tem diferença maior que 1 minuto entre o timestamp_captura e timestamp_gps.*/ - SELECT - *, - ST_GEOGPOINT(longitude, latitude) posicao_veiculo_geo - FROM - {{ ref('aux_gps' ~ var('fonte_gps')) }} - WHERE - data between DATE("{{var('date_range_start')}}") and DATE("{{var('date_range_end')}}") - {% if is_incremental() -%} - AND timestamp_gps > "{{var('date_range_start')}}" and timestamp_gps <="{{var('date_range_end')}}" - {%- endif -%} -), -realocacao as ( - SELECT - g.* except(linha), - coalesce(r.servico_realocado, g.linha) as linha - FROM - gps g - LEFT JOIN - {{ ref('aux_gps_realocacao' ~ var('fonte_gps')) }} r - ON - g.ordem = r.id_veiculo - and g.timestamp_gps = r.timestamp_gps -), -filtrada AS ( - /*1,2, e 3. Muda o nome de variáveis para o padrão do projeto.*/ - SELECT - ordem AS id_veiculo, - latitude, - longitude, - posicao_veiculo_geo, - velocidade, - linha, - timestamp_gps, - timestamp_captura, - data, - hora, - row_number() over (partition by ordem, timestamp_gps, linha) rn - FROM - realocacao - WHERE - ST_INTERSECTSBOX(posicao_veiculo_geo, - ( SELECT min_longitude FROM box), - ( SELECT min_latitude FROM box), - ( SELECT max_longitude FROM box), - ( SELECT max_latitude FROM box)) - ) -SELECT - * except(rn), - "{{ var("version") }}" as versao -FROM - filtrada -WHERE - rn = 1 \ No newline at end of file diff --git a/queries/models/monitoramento/staging/aux_gps_filtrada.sql b/queries/models/monitoramento/staging/aux_gps_filtrada.sql new file mode 100644 index 00000000..b0fe0b3d --- /dev/null +++ b/queries/models/monitoramento/staging/aux_gps_filtrada.sql @@ -0,0 +1,54 @@ +{{ config(materialized="ephemeral") }} + +with + box as ( + /* 1. Geometria de caixa que contém a área do município de Rio de Janeiro.*/ + select * from {{ var("limites_caixa") }} + ), + gps as ( + /* 2. Filtra registros antigos. Remove registros que tem diferença maior que 1 minuto entre o timestamp_captura e datetime_gps.*/ + select *, st_geogpoint(longitude, latitude) posicao_veiculo_geo + from {{ ref("aux_gps" ~ var("fonte_gps")) }} + where + data between date("{{var('date_range_start')}}") and date( + "{{var('date_range_end')}}" + ) + {% if is_incremental() -%} + and datetime_gps > "{{var('date_range_start')}}" + and datetime_gps <= "{{var('date_range_end')}}" + {%- endif -%} + ), + realocacao as ( + select g.* except (servico), coalesce(r.servico_realocado, g.servico) as servico + from gps g + left join + {{ ref("aux_gps_realocacao" ~ var("fonte_gps")) }} r + on g.id_veiculo = r.id_veiculo + and g.datetime_gps = r.datetime_gps + ), + filtrada as ( + /* 1,2, e 3. Muda o nome de variáveis para o padrão do projeto.*/ + select + data, + datetime_gps, + id_veiculo, + servico, + latitude, + longitude, + posicao_veiculo_geo, + datetime_captura, + velocidade, + row_number() over (partition by id_veiculo, datetime_gps, servico) rn + from realocacao + where + st_intersectsbox( + posicao_veiculo_geo, + (select min_longitude from box), + (select min_latitude from box), + (select max_longitude from box), + (select max_latitude from box) + ) + ) +select * except (rn), "{{ var(" version ") }}" as versao +from filtrada +where rn = 1 diff --git a/queries/models/monitoramento/staging/aux_gps_parada.sql b/queries/models/monitoramento/staging/aux_gps_parada.sql index 8ab66de9..000e8c8a 100644 --- a/queries/models/monitoramento/staging/aux_gps_parada.sql +++ b/queries/models/monitoramento/staging/aux_gps_parada.sql @@ -1,77 +1,79 @@ -{{ - config( - materialized='ephemeral' - ) -}} +{{ config(materialized="ephemeral") }} -/* -Descrição: -Identifica veículos parados em terminais ou garagens conhecidas. -1. Selecionamos os terminais conhecidos e uma geometria do tipo polígono (Polygon) que contém buracos nas -localizações das garagens. -2. Calculamos as distâncias do veículos em relação aos terminais conhecidos. Definimos aqui a coluna 'nrow', -que identifica qual o terminal que está mais próximo do ponto informado. No passo final, recuperamos apenas -os dados com nrow = 1 (menor distância em relação à posição do veículo) -3. Definimos uma distancia_limiar_parada. Caso o veículo esteja a uma distância menor que este valor de uma -parada, será considerado como parado no terminal com menor distancia. -4. Caso o veiculo não esteja intersectando o polígono das garagens, ele será considerado como parado dentro -de uma garagem (o polígono é vazado nas garagens, a não intersecção implica em estar dentro de um dos 'buracos'). -*/ -WITH - terminais as ( - -- 1. Selecionamos terminais, criando uma geometria de ponto para cada. - select - ST_GEOGPOINT(longitude, latitude) ponto_parada, nome_terminal nome_parada, 'terminal' tipo_parada - from {{ var('sppo_terminais') }} - ), - garagem_polygon AS ( - -- 1. Selecionamos o polígono das garagens. - SELECT ST_GEOGFROMTEXT(WKT,make_valid => true) AS poly - FROM {{ var('polygon_garagem') }} - ), - distancia AS ( - --2. Calculamos as distâncias e definimos nrow - SELECT - id_veiculo, - timestamp_gps, - data, - linha, - posicao_veiculo_geo, - nome_parada, - tipo_parada, - ROUND(ST_DISTANCE(posicao_veiculo_geo, ponto_parada), 1) distancia_parada, - ROW_NUMBER() OVER (PARTITION BY timestamp_gps, id_veiculo, linha ORDER BY ST_DISTANCE(posicao_veiculo_geo, ponto_parada)) nrow - FROM terminais p - JOIN ( - SELECT - id_veiculo, - timestamp_gps, - data, - linha, - posicao_veiculo_geo - FROM - {{ ref('aux_gps_filtrada' ~ var('fonte_gps')) }} - {%if not flags.FULL_REFRESH %} - WHERE - data between DATE("{{var('date_range_start')}}") and DATE("{{var('date_range_end')}}") - AND timestamp_gps > "{{var('date_range_start')}}" and timestamp_gps <="{{var('date_range_end')}}" - {% endif %} - ) r - on 1=1 - ) -SELECT - data, - id_veiculo, - timestamp_gps, - linha, - /* +with + terminais as ( + -- 1. Selecionamos terminais, criando uma geometria de ponto para cada. + select + st_geogpoint(stop_lon, stop_lat) as ponto_parada, + stop_name as nome_parada, + 'terminal' as tipo_parada + from {{ ref("stops_gtfs") }} + where location_type = "1" + ), + garagens as ( + -- 1. Selecionamos as garagens, , criando uma geometria de ponto para cada. + select + st_geogpoint(stop_lon, stop_lat) as ponto_parada, + stop_name as nome_parada, + 'garagens' as tipo_parada + from {{ ref("stops_gtfs") }} + left join + {{ ref("stop_times_gtfs") }} using (feed_version, feed_start_date, stop_id) + where + pickup_type is null and drop_off_type is null and stop_name like "Garagem%" + ), + pontos_parada as ( + -- Unimos terminais e garagens para obter todos os pontos de parada + select * + from terminais + union all + select * + from garagens + ), + distancia as ( + -- 2. Calculamos as distâncias e definimos nrow + select + id_veiculo, + datetime_gps, + data, + servico, + posicao_veiculo_geo, + nome_parada, + tipo_parada, + round(st_distance(posicao_veiculo_geo, ponto_parada), 1) distancia_parada, + row_number() over ( + partition by datetime_gps, id_veiculo, servico + order by st_distance(posicao_veiculo_geo, ponto_parada) + ) nrow + from pontos_parada p + join + ( + select id_veiculo, datetime_gps, data, servico, posicao_veiculo_geo + from {{ ref("aux_gps_filtrada") }} + {% if not flags.FULL_REFRESH %} + where + data between date("{{var('date_range_start')}}") and date( + "{{var('date_range_end')}}" + ) + and datetime_gps > "{{var('date_range_start')}}" + and datetime_gps <= "{{var('date_range_end')}}" + {% endif %} + ) r + on 1 = 1 + ) +select + data, + datetime_gps, + id_veiculo, + servico, + /* 3. e 4. Identificamos o status do veículo como 'terminal', 'garagem' (para os veículos parados) ou null (para os veículos mais distantes de uma parada que o limiar definido) */ - case - when distancia_parada < {{ var('distancia_limiar_parada') }} then tipo_parada - when not ST_INTERSECTS(posicao_veiculo_geo, (SELECT poly FROM garagem_polygon)) then 'garagem' - else null - end tipo_parada, -FROM distancia -WHERE nrow = 1 \ No newline at end of file + case + when distancia_parada < {{ var("distancia_limiar_parada") }} + then tipo_parada + else null + end tipo_parada, +from distancia +where nrow = 1 diff --git a/queries/models/monitoramento/staging/aux_gps_realocacao.sql b/queries/models/monitoramento/staging/aux_gps_realocacao.sql index e4c407d4..bee71918 100644 --- a/queries/models/monitoramento/staging/aux_gps_realocacao.sql +++ b/queries/models/monitoramento/staging/aux_gps_realocacao.sql @@ -1,76 +1,70 @@ {% if var("fifteen_minutes") == "_15_minutos" %} -{{ - config( - materialized='ephemeral', - ) -}} + {{ config(materialized="ephemeral", alias=this.name ~ var("fonte_gps")) }} -- verificar isso {% else %} -{{ - config( - materialized='incremental', - partition_by={ - "field":"data", - "data_type": "date", - "granularity":"day" - }, - alias=this.name ~ var('fonte_gps') - ) -}} + {{ + config( + materialized="incremental", + partition_by={"field": "data", "data_type": "date", "granularity": "day"}, + alias=this.name ~ var("fonte_gps"), + ) + }} {% endif %} -- 1. Filtra realocações válidas dentro do intervalo de GPS avaliado -with realocacao as ( - select - * except(datetime_saida), - case - when datetime_saida is null then datetime_operacao - else datetime_saida - end as datetime_saida, - from - {{ ref('aux_realocacao'~ var('fonte_gps')) }} - where - -- Realocação deve acontecer após o registro de GPS e até 1 hora depois - datetime_diff(datetime_operacao, datetime_entrada, minute) between 0 and 60 - and data between DATE("{{var('date_range_start')}}") - and DATE(datetime_add("{{var('date_range_end')}}", interval 1 hour)) - and (datetime_saida >= datetime("{{var('date_range_start')}}") or datetime_operacao >= datetime("{{var('date_range_start')}}")) -), --- 2. Altera registros de GPS com servicos realocados -gps as ( - select - ordem, - timestamp_gps, - linha, - data, - hora - from {{ ref('aux_gps' ~ var('fonte_gps') }} - where - data between DATE("{{var('date_range_start')}}") and DATE("{{var('date_range_end')}}") - and timestamp_gps > "{{var('date_range_start')}}" and timestamp_gps <= "{{var('date_range_end')}}" -), -combinacao as ( - select - r.id_veiculo, - g.timestamp_gps, - g.linha as servico_gps, - r.servico as servico_realocado, - r.datetime_operacao as datetime_realocacao, - g.data, - g.hora - from gps g - inner join realocacao r - on - g.ordem = r.id_veiculo - and g.linha != r.servico - and g.timestamp_gps between r.datetime_entrada and r.datetime_saida -) +with + realocacao as ( + select + * except (datetime_saida), + case + when datetime_saida is null then datetime_operacao else datetime_saida + end as datetime_saida, + from {{ ref("aux_realocacao" ~ var("fonte_gps")) }} + where + -- Realocação deve acontecer após o registro de GPS e até 1 hora depois + datetime_diff(datetime_operacao, datetime_entrada, minute) between 0 and 60 + and data between date("{{var('date_range_start')}}") and date( + datetime_add("{{var('date_range_end')}}", interval 1 hour) + ) + and ( + datetime_saida >= datetime("{{var('date_range_start')}}") + or datetime_operacao >= datetime("{{var('date_range_start')}}") + ) + ), + -- 2. Altera registros de GPS com servicos realocados + gps as ( + select id_veiculo, datetime_gps, servico, data, + from {{ ref("aux_gps" ~ var("fonte_gps")) }} + where + data between date("{{var('date_range_start')}}") and date( + "{{var('date_range_end')}}" + ) + and datetime_gps > "{{var('date_range_start')}}" + and datetime_gps <= "{{var('date_range_end')}}" + ), + combinacao as ( + select + g.data, + g.datetime_gps, + r.id_veiculo, + g.servico as servico_gps, + r.servico as servico_realocado, + r.datetime_operacao as datetime_realocado + from gps g + inner join + realocacao r + on g.id_veiculo = r.id_veiculo + and g.servico != r.servico + and g.datetime_gps between r.datetime_entrada and r.datetime_saida + ) -- Filtra realocacao mais recente para cada timestamp -select - * except(rn) -from ( - select - *, - row_number() over (partition by id_veiculo, timestamp_gps order by datetime_realocacao desc) as rn - from combinacao -) -where rn = 1 \ No newline at end of file +select * except (rn) +from + ( + select + *, + row_number() over ( + partition by id_veiculo, datetime_gps order by datetime_realocado desc + ) as rn + from combinacao + ) +where rn = 1 diff --git a/queries/models/monitoramento/staging/aux_gps_trajeto_correto.sql b/queries/models/monitoramento/staging/aux_gps_trajeto_correto.sql index 16e31889..ef562bb3 100644 --- a/queries/models/monitoramento/staging/aux_gps_trajeto_correto.sql +++ b/queries/models/monitoramento/staging/aux_gps_trajeto_correto.sql @@ -1,95 +1,83 @@ -{{ - config( - materialized='ephemeral' - ) -}} +{{ config(materialized="ephemeral") }} -/* -Descrição: -Calcula se o veículo está dentro do trajeto correto dado o traçado (shape) cadastrado no SIGMOB em relação à linha que está sendo -transmitida. -1. Calcula as intersecções definindo um 'buffer', utilizado por st_dwithin para identificar se o ponto está à uma -distância menor ou igual ao tamanho do buffer em relação ao traçado definido no SIGMOB. -2. Calcula um histórico de intersecções nos ultimos 10 minutos de registros de cada carro. Definimos que o carro é -considerado fora do trajeto definido se a cada 10 minutos, ele não esteve dentro do traçado planejado pelo menos uma -vez. -3. Identifica se a linha informada no registro capturado existe nas definições presentes no SIGMOB. -4. Definimos em outra tabela uma 'data_versao_efetiva', esse passo serve tanto para definir qual versão do SIGMOB utilizaremos em -caso de falha na captura, quanto para definir qual versão será utilizada para o cálculo retroativo do histórico de registros que temos. -5. Como não conseguimos identificar o itinerário que o carro está realizando, no passo counts, os resultados de -intersecções são dobrados, devido ao fato de cada linha apresentar dois itinerários possíveis (ida/volta). Portanto, -ao final, realizamos uma agregação LOGICAL_OR que é true caso o carro esteja dentro do traçado de algum dos itinerários -possíveis para a linha informada. -*/ -WITH - registros AS ( - SELECT - id_veiculo, - linha, - latitude, - longitude, - data, - posicao_veiculo_geo, - timestamp_gps - FROM - {{ ref('aux_gps_filtrada' ~ var('fonte_gps')) }} r - {% if not flags.FULL_REFRESH -%} - WHERE - data between DATE("{{var('date_range_start')}}") and DATE("{{var('date_range_end')}}") - AND timestamp_gps > "{{var('date_range_start')}}" and timestamp_gps <="{{var('date_range_end')}}" - {%- endif -%} - ), - intersec AS ( - SELECT - r.*, - s.data_versao, - s.linha_gtfs, - s.route_id, - -- 1. Buffer e intersecções - CASE - WHEN st_dwithin(shape, posicao_veiculo_geo, {{ var('tamanho_buffer_metros') }}) THEN TRUE - ELSE FALSE - END AS flag_trajeto_correto, - -- 2. Histórico de intersecções nos últimos 10 minutos a partir da timestamp_gps atual - CASE - WHEN - COUNT(CASE WHEN st_dwithin(shape, posicao_veiculo_geo, {{ var('tamanho_buffer_metros') }}) THEN 1 END) - OVER (PARTITION BY id_veiculo - ORDER BY UNIX_SECONDS(TIMESTAMP(timestamp_gps)) - RANGE BETWEEN {{ var('intervalo_max_desvio_segundos') }} PRECEDING AND CURRENT ROW) >= 1 - THEN True - ELSE False - END AS flag_trajeto_correto_hist, - -- 3. Identificação de cadastro da linha no SIGMOB - CASE WHEN s.linha_gtfs IS NULL THEN False ELSE True END AS flag_linha_existe_sigmob, - -- 4. Join com data_versao_efetiva para definição de quais shapes serão considerados no cálculo das flags - FROM registros r - LEFT JOIN ( - SELECT * - FROM {{ ref('shapes_geom') }} - WHERE id_modal_smtr in ({{ var('sppo_id_modal_smtr')|join(', ') }}) - AND data_versao = "{{ var('versao_fixa_sigmob')}}" - ) s - ON - r.linha = s.linha_gtfs - ) - -- 5. Agregação com LOGICAL_OR para evitar duplicação de registros - SELECT - id_veiculo, - linha, - linha_gtfs, - route_id, - data, - timestamp_gps, - LOGICAL_OR(flag_trajeto_correto) AS flag_trajeto_correto, - LOGICAL_OR(flag_trajeto_correto_hist) AS flag_trajeto_correto_hist, - LOGICAL_OR(flag_linha_existe_sigmob) AS flag_linha_existe_sigmob, - FROM intersec i - GROUP BY - id_veiculo, - linha, - linha_gtfs, - route_id, - data, - data_versao, - timestamp_gps +with + registros as ( + select + id_veiculo, + servico, + latitude, + longitude, + data, + posicao_veiculo_geo, + datetime_gps + from {{ ref("aux_gps_filtrada") }} r + {% if not flags.FULL_REFRESH -%} + where + data between date("{{var('date_range_start')}}") and date( + "{{var('date_range_end')}}" + ) + and datetime_gps > "{{var('date_range_start')}}" + and datetime_gps <= "{{var('date_range_end')}}" + {%- endif -%} + ), + intersec as ( + select + r.*, + s.feed_version, + s.servico, + s.route_id, + -- 1. Histórico de intersecções nos últimos 10 minutos a partir da + -- datetime_gps atual + case + when + count( + case + when + st_dwithin( + shape, + posicao_veiculo_geo, + {{ var("tamanho_buffer_metros") }} + ) + then 1 + end + ) over ( + partition by id_veiculo + order by + unix_seconds(timestamp(datetime_gps)) + range + between {{ var("intervalo_max_desvio_segundos") }} preceding + and current row + ) + >= 1 + then true + else false + end as indicador_trajeto_correto, + -- 2. Join com data_versao_efetiva para definição de quais shapes serão + -- considerados no cálculo do indicador + from registros r + left join + ( + select * + from {{ ref("viagem_planejada_planejamento") }} + left join + {{ ref("shapes_geom_gtfs") }} using ( + feed_version, feed_start_date, shape_id + ) + where + data between date("{{var('date_range_start')}}") and date( + "{{var('date_range_end')}}" + ) + ) s + on r.servico = s.servico + and r.data = s.data + ) +-- 3. Agregação com LOGICAL_OR para evitar duplicação de registros +select + data, + datetime_gps, + id_veiculo, + servico, + route_id, + logical_or(indicador_trajeto_correto) as indicador_trajeto_correto +from intersec i +group by id_veiculo, servico, route_id, data, datetime_gps diff --git a/queries/models/monitoramento/staging/aux_gps_velocidade.sql b/queries/models/monitoramento/staging/aux_gps_velocidade.sql index a3eaf64a..02e40084 100644 --- a/queries/models/monitoramento/staging/aux_gps_velocidade.sql +++ b/queries/models/monitoramento/staging/aux_gps_velocidade.sql @@ -1,91 +1,82 @@ -{{ - config( - materialized='ephemeral' - ) -}} -/* -Descrição: -Estimativa das velocidades dos veículos nos últimos 10 minutos contados a partir da timestamp_gps atual. -Essa metodologia serve para determinar quais carros estão em movimento e quais estão parados. -1. Calculamos a velocidade do veículo no último trecho de 10 minutos de operação. -A implementação utiliza a função 'first_value' com uma janela (cláusula 'over') de até 10 minutos anteriores à -timestamp_gps atual e calcula a distância do ponto mais antigo (o first_value na janela) ao ponto atual (posicao_veiculo_geo). -Dividimos essa distância pela diferença de tempo entre a timestamp_gps atual e a timestamp_gps do ponto mais -antigo da janela (o qual recuperamos novamente com o uso de first_value). -Esta diferença de tempo (datetime_diff) é calculada em segundos, portanto multiplicamos o resultado da divisão por um fator -3.6 para que a velocidade esteja em quilômetros por hora. O resultado final é arrendondado sem casas decimais. -Por fim, cobrimos esse cálculo com a função 'if_null' e retornamos zero para a velocidade em casos onde a divisão retornaria -um valor nulo. -2. Após o calculo da velocidade, definimos a coluna 'status_movimento'. Veículos abaixo da 'velocidade_limiar_parado', são -considerados como 'parado'. Caso contrário, são considerados 'andando' -*/ +{{ config(materialized="ephemeral") }} + with t_velocidade as ( - select - data, - id_veiculo, - timestamp_gps, - linha, - ST_DISTANCE( - posicao_veiculo_geo, - lag(posicao_veiculo_geo) over ( - partition by id_veiculo, linha - order by timestamp_gps) - ) distancia, - IFNULL( - SAFE_DIVIDE( - ST_DISTANCE( + select + data, + id_veiculo, + datetime_gps, + servico, + st_distance( posicao_veiculo_geo, lag(posicao_veiculo_geo) over ( - partition by id_veiculo, linha - order by timestamp_gps) + partition by id_veiculo, servico order by datetime_gps + ) + ) distancia, + ifnull( + safe_divide( + st_distance( + posicao_veiculo_geo, + lag(posicao_veiculo_geo) over ( + partition by id_veiculo, servico order by datetime_gps + ) + ), + datetime_diff( + datetime_gps, + lag(datetime_gps) over ( + partition by id_veiculo, servico order by datetime_gps + ), + second + ) ), - DATETIME_DIFF( - timestamp_gps, - lag(timestamp_gps) over ( - partition by id_veiculo, linha - order by timestamp_gps), - SECOND - )), - 0 - ) * 3.6 velocidade - FROM {{ ref("aux_gps_filtrada" ~ var('fonte_gps')) }} - {%if not flags.FULL_REFRESH -%} - WHERE - data between DATE("{{var('date_range_start')}}") and DATE("{{var('date_range_end')}}") - AND timestamp_gps > "{{var('date_range_start')}}" and timestamp_gps <="{{var('date_range_end')}}" - {%- endif -%} + 0 + ) + * 3.6 velocidade + from {{ ref("aux_gps_filtrada") }} + {% if not flags.FULL_REFRESH -%} + where + data between date("{{var('date_range_start')}}") and date( + "{{var('date_range_end')}}" + ) + and datetime_gps > "{{var('date_range_start')}}" + and datetime_gps <= "{{var('date_range_end')}}" + {%- endif -%} ), medias as ( select - data, - id_veiculo, - timestamp_gps, - linha, - distancia, - velocidade, # velocidade do pontual - AVG(velocidade) OVER ( - PARTITION BY id_veiculo, linha - ORDER BY unix_seconds(timestamp(timestamp_gps)) - RANGE BETWEEN {{ var('janela_movel_velocidade') }} PRECEDING AND CURRENT ROW - ) velocidade_media # velocidade com média móvel - from t_velocidade + data, + id_veiculo, + datetime_gps, + servico, + distancia, + velocidade, -- velocidade do pontual + avg(velocidade) over ( + partition by id_veiculo, servico + order by + unix_seconds(timestamp(datetime_gps)) + range + between {{ var("janela_movel_velocidade") }} preceding + and current row + ) velocidade_media -- velocidade com média móvel + from t_velocidade ) -SELECT - timestamp_gps, +select data, + datetime_gps, id_veiculo, - linha, + servico, distancia, - ROUND( - CASE WHEN velocidade_media > {{ var('velocidade_maxima') }} - THEN {{ var('velocidade_maxima') }} - ELSE velocidade_media - END, - 1) as velocidade, - -- 2. Determinação do estado de movimento do veículo. + round( + case + when velocidade_media > {{ var("velocidade_maxima") }} + then {{ var("velocidade_maxima") }} + else velocidade_media + end, + 1 + ) as velocidade, case - when velocidade_media < {{ var('velocidade_limiar_parado') }} then false + when velocidade_media < {{ var("velocidade_limiar_parado") }} + then false else true - end flag_em_movimento, -FROM medias \ No newline at end of file + end indicador_em_movimento, +from medias diff --git a/queries/models/monitoramento/staging/aux_realocacao.sql b/queries/models/monitoramento/staging/aux_realocacao.sql index 148a47af..84558820 100644 --- a/queries/models/monitoramento/staging/aux_realocacao.sql +++ b/queries/models/monitoramento/staging/aux_realocacao.sql @@ -1,17 +1,25 @@ -{{ config(alias=this.name ~ var('fonte_gps')) }} +{{ config(alias=this.name ~ var("fonte_gps")) }} -SELECT - SAFE_CAST(id_veiculo AS STRING) id_veiculo, - SAFE_CAST(DATETIME(TIMESTAMP(datetime_operacao), "America/Sao_Paulo") AS DATETIME) datetime_operacao, - concat( - ifnull(REGEXP_EXTRACT(servico, r'[A-Z]+'), ""), - ifnull(REGEXP_EXTRACT(servico, r'[0-9]+'), "") - ) as servico, - SAFE_CAST(DATETIME(TIMESTAMP(datetime_entrada), "America/Sao_Paulo") AS DATETIME) as datetime_entrada, - SAFE_CAST(DATETIME(TIMESTAMP(datetime_saida), "America/Sao_Paulo") AS DATETIME) as datetime_saida, - SAFE_CAST(DATETIME(TIMESTAMP(timestamp_processamento), "America/Sao_Paulo") AS DATETIME) as timestamp_processamento, - SAFE_CAST(DATETIME(TIMESTAMP(timestamp_captura), "America/Sao_Paulo") AS DATETIME) as timestamp_captura, - data, - hora -FROM - {{var('sppo_realocacao_staging')}} as t \ No newline at end of file +select + data, + safe_cast( + datetime(timestamp(datetime_operacao), "America/Sao_Paulo") as datetime + ) datetime_operacao, + safe_cast(id_veiculo as string) id_veiculo, + concat( + ifnull(regexp_extract(servico, r'[A-Z]+'), ""), + ifnull(regexp_extract(servico, r'[0-9]+'), "") + ) as servico, + safe_cast( + datetime(timestamp(datetime_entrada), "America/Sao_Paulo") as datetime + ) as datetime_entrada, + safe_cast( + datetime(timestamp(datetime_saida), "America/Sao_Paulo") as datetime + ) as datetime_saida, + safe_cast( + datetime(timestamp(timestamp_processamento), "America/Sao_Paulo") as datetime + ) as datetime_processamento, + safe_cast( + datetime(timestamp(timestamp_captura), "America/Sao_Paulo") as datetime + ) as datetime_captura, +from {{ var("sppo_realocacao_staging") }} From 123be58a7e8bf80d761d78d1057713cffb243538 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Mon, 4 Nov 2024 13:23:57 -0300 Subject: [PATCH 04/10] filtra feed_start_date --- .../monitoramento/staging/aux_gps_parada.sql | 31 ++++++++++++++++--- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/queries/models/monitoramento/staging/aux_gps_parada.sql b/queries/models/monitoramento/staging/aux_gps_parada.sql index 000e8c8a..ead59870 100644 --- a/queries/models/monitoramento/staging/aux_gps_parada.sql +++ b/queries/models/monitoramento/staging/aux_gps_parada.sql @@ -1,5 +1,19 @@ {{ config(materialized="ephemeral") }} +{% if execute %} + {% set feed_start_date = ( + run_query( + "SELECT DISTINCT feed_start_date FROM rj-smtr-dev.rafael__planejamento.calendario WHERE data BETWEEN DATE('" + ~ var("date_range_start") + ~ "') AND DATE('" + ~ var("date_range_end") + ~ "')" + ) + .columns[0] + .values()[0] + ) %} +{% endif %} + with terminais as ( -- 1. Selecionamos terminais, criando uma geometria de ponto para cada. @@ -7,8 +21,9 @@ with st_geogpoint(stop_lon, stop_lat) as ponto_parada, stop_name as nome_parada, 'terminal' as tipo_parada - from {{ ref("stops_gtfs") }} - where location_type = "1" + -- from {{ ref("stops_gtfs") }} + from `rj-smtr`.`gtfs`.`stops` + where location_type = "1" and feed_start_date = date("{{ feed_start_date }}") ), garagens as ( -- 1. Selecionamos as garagens, , criando uma geometria de ponto para cada. @@ -16,11 +31,17 @@ with st_geogpoint(stop_lon, stop_lat) as ponto_parada, stop_name as nome_parada, 'garagens' as tipo_parada - from {{ ref("stops_gtfs") }} + -- from {{ ref("stops_gtfs") }} + from `rj-smtr`.`gtfs`.`stops` left join - {{ ref("stop_times_gtfs") }} using (feed_version, feed_start_date, stop_id) + -- {{ ref("stop_times_gtfs") }} using (feed_version, feed_start_date, + -- stop_id) + `rj-smtr`.`gtfs`.`stop_times` using (feed_version, feed_start_date, stop_id) where - pickup_type is null and drop_off_type is null and stop_name like "Garagem%" + pickup_type is null + and drop_off_type is null + and stop_name like "Garagem%" + and feed_start_date = date("{{ feed_start_date }}") ), pontos_parada as ( -- Unimos terminais e garagens para obter todos os pontos de parada From 172351904024ae2636dd442ea5eaefdae5c6b9bb Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Mon, 4 Nov 2024 13:26:01 -0300 Subject: [PATCH 05/10] corrige modelos --- queries/models/monitoramento/gps.sql | 9 +-------- .../models/monitoramento/staging/aux_gps_filtrada.sql | 2 +- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/queries/models/monitoramento/gps.sql b/queries/models/monitoramento/gps.sql index 6c405b39..665e599f 100644 --- a/queries/models/monitoramento/gps.sql +++ b/queries/models/monitoramento/gps.sql @@ -46,14 +46,7 @@ with ), indicadores as ( -- 4. indicador_trajeto_correto - select - id_veiculo, - datetime_gps, - servico, - route_id, - indicador_linha_existe_sigmob, - indicador_trajeto_correto, - indicador_trajeto_correto + select id_veiculo, datetime_gps, servico, route_id, indicador_trajeto_correto from {{ ref("aux_gps_trajeto_correto") }} ) -- 5. Junção final diff --git a/queries/models/monitoramento/staging/aux_gps_filtrada.sql b/queries/models/monitoramento/staging/aux_gps_filtrada.sql index b0fe0b3d..d0eea9ec 100644 --- a/queries/models/monitoramento/staging/aux_gps_filtrada.sql +++ b/queries/models/monitoramento/staging/aux_gps_filtrada.sql @@ -49,6 +49,6 @@ with (select max_latitude from box) ) ) -select * except (rn), "{{ var(" version ") }}" as versao +select * except (rn), "{{ var('version') }}" as versao from filtrada where rn = 1 From b217ae137e24070f35a1ef3e8dbee85394ed683e Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Mon, 4 Nov 2024 13:26:17 -0300 Subject: [PATCH 06/10] altera refs para teste --- .../monitoramento/staging/aux_gps_trajeto_correto.sql | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/queries/models/monitoramento/staging/aux_gps_trajeto_correto.sql b/queries/models/monitoramento/staging/aux_gps_trajeto_correto.sql index ef562bb3..4eec984e 100644 --- a/queries/models/monitoramento/staging/aux_gps_trajeto_correto.sql +++ b/queries/models/monitoramento/staging/aux_gps_trajeto_correto.sql @@ -24,7 +24,6 @@ with select r.*, s.feed_version, - s.servico, s.route_id, -- 1. Histórico de intersecções nos últimos 10 minutos a partir da -- datetime_gps atual @@ -58,9 +57,11 @@ with left join ( select * - from {{ ref("viagem_planejada_planejamento") }} + -- from {{ ref("viagem_planejada") }} --viagem_planejada_planejamento + from `rj-smtr-dev`.`rafael__planejamento`.`viagem_planejada` left join - {{ ref("shapes_geom_gtfs") }} using ( + -- {{ ref("shapes_geom_gtfs") }} using ( + `rj-smtr`.`gtfs`.`shapes_geom` using ( feed_version, feed_start_date, shape_id ) where From 433b34ab661223ab9e4ebb550ae128a235af589a Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Wed, 6 Nov 2024 17:12:04 -0300 Subject: [PATCH 07/10] add var buffer_segmento_metros --- queries/dbt_project.yml | 2 ++ .../models/monitoramento/staging/aux_gps_trajeto_correto.sql | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/queries/dbt_project.yml b/queries/dbt_project.yml index 9fdc4079..7fc90b89 100644 --- a/queries/dbt_project.yml +++ b/queries/dbt_project.yml @@ -53,6 +53,8 @@ vars: velocidade_limiar_parado: 3 ## Distância mínima para que o veículo seja identificado parado em um terminal ou garagem em aux_registros_parada distancia_limiar_parada: 250 + ## Tamanho do buffer do segmento + buffer_segmento_metros: 20 # flag_trajeto_correto ### [ANTIGO] SIGMOB ### diff --git a/queries/models/monitoramento/staging/aux_gps_trajeto_correto.sql b/queries/models/monitoramento/staging/aux_gps_trajeto_correto.sql index 4eec984e..784dde67 100644 --- a/queries/models/monitoramento/staging/aux_gps_trajeto_correto.sql +++ b/queries/models/monitoramento/staging/aux_gps_trajeto_correto.sql @@ -35,7 +35,7 @@ with st_dwithin( shape, posicao_veiculo_geo, - {{ var("tamanho_buffer_metros") }} + {{ var("buffer_segmento_metros") }} ) then 1 end From 220dc6c52d20f183fa205c9a8b54c3e56ea50f9f Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Wed, 6 Nov 2024 17:20:47 -0300 Subject: [PATCH 08/10] trocar por using --- queries/models/monitoramento/gps.sql | 14 +++----------- .../monitoramento/staging/aux_gps_filtrada.sql | 3 +-- .../staging/aux_gps_trajeto_correto.sql | 3 +-- .../monitoramento/staging/aux_gps_velocidade.sql | 2 +- 4 files changed, 6 insertions(+), 16 deletions(-) diff --git a/queries/models/monitoramento/gps.sql b/queries/models/monitoramento/gps.sql index 665e599f..178a2e0d 100644 --- a/queries/models/monitoramento/gps.sql +++ b/queries/models/monitoramento/gps.sql @@ -84,21 +84,13 @@ from registros r join indicadores i - on r.id_veiculo = i.id_veiculo - and r.datetime_gps = i.datetime_gps - and r.servico = i.servico - +using(id_veiculo, datetime_gps, servico) join velocidades v - on r.id_veiculo = v.id_veiculo - and r.datetime_gps = v.datetime_gps - and r.servico = v.servico - +using(id_veiculo, datetime_gps, servico) join paradas p - on r.id_veiculo = p.id_veiculo - and r.datetime_gps = p.datetime_gps - and r.servico = p.servico +using(id_veiculo, datetime_gps, servico) {% if is_incremental() -%} where date(r.datetime_gps) between date("{{var('date_range_start')}}") and date( diff --git a/queries/models/monitoramento/staging/aux_gps_filtrada.sql b/queries/models/monitoramento/staging/aux_gps_filtrada.sql index d0eea9ec..d2db4638 100644 --- a/queries/models/monitoramento/staging/aux_gps_filtrada.sql +++ b/queries/models/monitoramento/staging/aux_gps_filtrada.sql @@ -23,8 +23,7 @@ with from gps g left join {{ ref("aux_gps_realocacao" ~ var("fonte_gps")) }} r - on g.id_veiculo = r.id_veiculo - and g.datetime_gps = r.datetime_gps + using(id_veiculo, datetime_gps) ), filtrada as ( /* 1,2, e 3. Muda o nome de variáveis para o padrão do projeto.*/ diff --git a/queries/models/monitoramento/staging/aux_gps_trajeto_correto.sql b/queries/models/monitoramento/staging/aux_gps_trajeto_correto.sql index 784dde67..ed756e8d 100644 --- a/queries/models/monitoramento/staging/aux_gps_trajeto_correto.sql +++ b/queries/models/monitoramento/staging/aux_gps_trajeto_correto.sql @@ -69,8 +69,7 @@ with "{{var('date_range_end')}}" ) ) s - on r.servico = s.servico - and r.data = s.data + using (servico, data) ) -- 3. Agregação com LOGICAL_OR para evitar duplicação de registros select diff --git a/queries/models/monitoramento/staging/aux_gps_velocidade.sql b/queries/models/monitoramento/staging/aux_gps_velocidade.sql index 02e40084..6ac1f19f 100644 --- a/queries/models/monitoramento/staging/aux_gps_velocidade.sql +++ b/queries/models/monitoramento/staging/aux_gps_velocidade.sql @@ -49,7 +49,7 @@ with datetime_gps, servico, distancia, - velocidade, -- velocidade do pontual + velocidade, avg(velocidade) over ( partition by id_veiculo, servico order by From 2003807d17cbcdd7e23454bc5ea9eebed7a9f551 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Wed, 6 Nov 2024 17:47:01 -0300 Subject: [PATCH 09/10] atualiza ref para source com variavel --- queries/models/monitoramento/staging/aux_gps.sql | 2 +- queries/models/monitoramento/staging/aux_realocacao.sql | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/queries/models/monitoramento/staging/aux_gps.sql b/queries/models/monitoramento/staging/aux_gps.sql index a38e8756..fef1ac2d 100644 --- a/queries/models/monitoramento/staging/aux_gps.sql +++ b/queries/models/monitoramento/staging/aux_gps.sql @@ -16,4 +16,4 @@ select datetime(timestamp(timestamp_captura), "America/Sao_Paulo") as datetime ) datetime_captura, safe_cast(velocidade as int64) velocidade -from {{ var("sppo_registros_staging") }} +from {{ source('br_rj_riodejaneiro_onibus_gps_' ~ var("fonte_gps") ~ '_staging', "registros") }} diff --git a/queries/models/monitoramento/staging/aux_realocacao.sql b/queries/models/monitoramento/staging/aux_realocacao.sql index 84558820..a3da81dd 100644 --- a/queries/models/monitoramento/staging/aux_realocacao.sql +++ b/queries/models/monitoramento/staging/aux_realocacao.sql @@ -22,4 +22,4 @@ select safe_cast( datetime(timestamp(timestamp_captura), "America/Sao_Paulo") as datetime ) as datetime_captura, -from {{ var("sppo_realocacao_staging") }} +from {{ source('br_rj_riodejaneiro_onibus_gps_' ~ var("fonte_gps") ~ '_staging', "realocacao") }} From f30a8f8db6e3211f8738d88a9d0f953a1ed9db26 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Wed, 6 Nov 2024 18:53:12 -0300 Subject: [PATCH 10/10] add distinct na CTE garagens --- queries/models/monitoramento/staging/aux_gps_parada.sql | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/queries/models/monitoramento/staging/aux_gps_parada.sql b/queries/models/monitoramento/staging/aux_gps_parada.sql index ead59870..5c085c2f 100644 --- a/queries/models/monitoramento/staging/aux_gps_parada.sql +++ b/queries/models/monitoramento/staging/aux_gps_parada.sql @@ -27,8 +27,8 @@ with ), garagens as ( -- 1. Selecionamos as garagens, , criando uma geometria de ponto para cada. - select - st_geogpoint(stop_lon, stop_lat) as ponto_parada, + select distinct + st_astext(st_geogpoint(stop_lon, stop_lat)) as ponto_parada, stop_name as nome_parada, 'garagens' as tipo_parada -- from {{ ref("stops_gtfs") }} @@ -40,7 +40,7 @@ with where pickup_type is null and drop_off_type is null - and stop_name like "Garagem%" + and stop_name like "%Garagem%" and feed_start_date = date("{{ feed_start_date }}") ), pontos_parada as ( @@ -48,7 +48,8 @@ with select * from terminais union all - select * + select st_geogfromtext(ponto_parada) as ponto_parada, + * except(ponto_parada) from garagens ), distancia as (