Skip to content

Commit

Permalink
Alterações para reprocessamento
Browse files Browse the repository at this point in the history
  • Loading branch information
vtr363 committed Nov 9, 2024
1 parent bbbdf33 commit 51fc755
Show file tree
Hide file tree
Showing 10 changed files with 362 additions and 115 deletions.
18 changes: 9 additions & 9 deletions queries/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ vars:
trips_staging: "rj-smtr-staging.br_rj_riodejaneiro_sigmob_staging.trips"

## GTFS
data_versao_gtfs: "2024-05-03" # fixada última data disponível
data_versao_gtfs: "2024-09-29" # fixada última data disponível

### Subsídio SPPO (Ônibus) ###
buffer: 500 # distância em metros para buffer
Expand Down Expand Up @@ -197,10 +197,10 @@ models:
rj_smtr:
projeto_subsidio_sppo:
+materialized: view
+schema: projeto_subsidio_sppo
+schema: victor__projeto_subsidio_sppo
deprecated:
+materialized: view
+schema: projeto_subsidio_sppo
+schema: victor__projeto_subsidio_sppo
br_rj_riodejaneiro_sigmob:
+materialized: view
+schema: br_rj_riodejaneiro_sigmob
Expand All @@ -218,10 +218,10 @@ models:
+schema: br_rj_riodejaneiro_veiculos
dashboard_subsidio_sppo:
+materialized: view
+schema: dashboard_subsidio_sppo
+schema: victor__dashboard_subsidio_sppo
dashboard_subsidio_sppo_staging:
+materialized: view
+schema: dashboard_subsidio_sppo_staging
+schema: victor__dashboard_subsidio_sppo_staging
veiculo:
+materialized: view
+schema: veiculo
Expand Down Expand Up @@ -300,18 +300,18 @@ models:
financeiro:
+materialized: incremental
+incremental_strategy: insert_overwrite
+schema: financeiro
+schema: victor__financeiro
staging:
+materialized: incremental
+incremental_strategy: insert_overwrite
+schema: financeiro_staging
+schema: victor__financeiro_staging
dashboard_subsidio_sppo_v2:
+materialized: view
+schema: dashboard_subsidio_sppo_v2
+schema: victor__dashboard_subsidio_sppo_v2
subsidio:
+materialized: incremental
+incremental_strategy: insert_overwrite
+schema: subsidio
+schema: victor__subsidio
catalogo:
+materialized: view
+schema: catalogo
Expand Down
217 changes: 212 additions & 5 deletions queries/dev/run.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,218 @@
# -*- coding: utf-8 -*-
# import os
import os
from typing import Dict, List, Union

from utils import run_dbt_model

# Veja os parâmetros disponíveis da função run_dbt_model em util.py
def run_dbt_tests(
dataset_id: str = None,
table_id: str = None,
model: str = None,
upstream: bool = None,
downstream: bool = None,
exclude: str = None,
flags: str = None,
_vars: Union[dict, List[Dict]] = None,
):
"""
Run DBT test
"""
run_command = "dbt test"

common_flags = "--profiles-dir ./dev"

if flags:
flags = f"{common_flags} {flags}"
else:
flags = common_flags

if not model:
model = dataset_id
if table_id:
model += f".{table_id}"

if model:
run_command += " --select "
if upstream:
run_command += "+"
run_command += model
if downstream:
run_command += "+"

if exclude:
run_command += f" --exclude {exclude}"

if _vars:
if isinstance(_vars, list):
vars_dict = {}
for elem in _vars:
vars_dict.update(elem)
vars_str = f'"{vars_dict}"'
run_command += f" --vars {vars_str}"
else:
vars_str = f'"{_vars}"'
run_command += f" --vars {vars_str}"

if flags:
run_command += f" {flags}"

print(f"\n>>> RUNNING: {run_command}\n")

project_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
os.chdir(project_dir)
os.system(run_command)


from utils import run_dbt_model
from datetime import datetime, timedelta

# ordem_servico_trips_shapes_gtfs ##
run_dbt_model(
dataset_id="example",
table_id="my_first_dbt_model",
dataset_id="gtfs",
# dataset_id="planejamento",
table_id="ordem_servico_trips_shapes_gtfs",
# upstream=True,
_vars={"data_versao_gtfs": "2024-11-06"},
flags="--target hmg",
)

## dados de autuação ##
# run_dbt_model(
# dataset_id="transito",
# table_id="receita_autuacao",
# # upstream=True,
# # _vars={"date_range_start": "2019-01-01", "date_range_end": "2023-08-26"},
# flags="--full-refresh",
# )

# dados viagens ##
# run_dbt_model(
# dataset_id="projeto_subsidio_sppo",
# table_id="viagem_completa",
# upstream=True,
# exclude="+gps_sppo +ordem_servico_trips_shapes_gtfs",
# _vars={"run_date": "2024-11-07"},
# flags="--target hmg",
# )

## loop para dados de viagens em D+1 ##

# data_inicial = datetime.strptime("2024-11-07", "%Y-%m-%d")
# data_final = datetime.strptime("2024-11-08", "%Y-%m-%d")

# data_atual = data_inicial
# while data_atual <= data_final:
# run_dbt_model(
# dataset_id="projeto_subsidio_sppo",
# table_id="viagem_completa",
# upstream=True,
# exclude="+gps_sppo +ordem_servico_trips_shapes_gtfs",
# _vars={"run_date": data_atual.strftime("%Y-%m-%d")},
# # flags="--full-refresh",
# flags="--target hmg",
# )
# print(data_atual.strftime("%Y-%m-%d"))
# data_atual += timedelta(days=1)

# data_inicial = datetime.strptime("2024-10-02", "%Y-%m-%d")
# data_final = datetime.strptime("2024-10-16", "%Y-%m-%d")

# data_atual = data_inicial
# while data_atual <= data_final:
# run_dbt_model(
# dataset_id="projeto_subsidio_sppo",
# table_id="viagem_completa",
# upstream=True,
# exclude="+gps_sppo +ordem_servico_trips_shapes_gtfs",
# _vars={"run_date": data_atual.strftime("%Y-%m-%d")},
# # flags="--full-refresh",
# flags="--target hmg",
# )
# print(data_atual.strftime("%Y-%m-%d"))
# data_atual += timedelta(days=1)

## dados subsidio ##
# run_dbt_model(
# dataset_id=" subsidio dashboard_subsidio_sppo",
# _vars={"start_date": "2024-07-16", "end_date": "2024-07-31"},
# # flags="--full-refresh",
# )
# run_dbt_model(
# dataset_id="dashboard_subsidio_sppo",
# table_id="sumario_servico_dia_historico",
# _vars={"start_date": "2024-07-16", "end_date": "2024-07-17"},
# # flags="--full-refresh",
# )
# run_dbt_model(
# dataset_id="subsidio",
# # table_id="sumario_servico_dia_tipo_sem_glosa",
# _vars={"start_date": "2024-07-20", "end_date": "2024-07-20"},
# flags="--full-refresh",
# )


## Teste de modelos ##
# run_dbt_tests( # ok
# dataset_id="br_rj_riodejaneiro_onibus_gps",
# table_id="sppo_registros sppo_realocacao",
# _vars={"start_timestamp": "2024-09-01 00:00:00", "end_timestamp": "2024-09-15 03:00:00"},
# flags="--target hmg",
# )
# run_dbt_tests( # ok
# dataset_id="br_rj_riodejaneiro_veiculos",
# table_id="gps_sppo",
# _vars={"start_timestamp": "2024-09-01 00:00:00", "end_timestamp": "2024-09-15 03:00:00"},
# flags="--target dev",
# )
# run_dbt_tests( # ok
# dataset_id="veiculo",
# table_id="sppo_veiculo_dia",
# _vars={"start_timestamp": "2024-09-01 00:00:00", "end_timestamp": "2024-09-15 00:00:00"},
# flags="--target dev",
# )
# run_dbt_tests( # ok
# dataset_id="dashboard_subsidio_sppo",
# table_id="viagens_remuneradas",
# _vars={"start_timestamp": "2024-10-06 00:00:00", "end_timestamp": "2024-10-06 03:00:00"},
# flags="--target hmg",
# )
# run_dbt_tests( # ok
# dataset_id="dashboard_subsidio_sppo_v2",
# table_id="sumario_servico_dia_pagamento",
# _vars={"start_timestamp": "2024-10-06 00:00:00", "end_timestamp": "2024-10-06 03:00:00"},
# flags="--target hmg",
# )
# run_dbt_tests( # ok
# dataset_id="dashboard_subsidio_sppo",
# table_id="sumario_servico_dia_historico",
# _vars={"start_timestamp": "2024-09-01 00:00:00", "end_timestamp": "2024-09-15 00:00:00"},
# flags="--target dev",
# )
# run_dbt_tests( # ok
# dataset_id="dashboard_subsidio_sppo",
# table_id="sumario_servico_dia",
# _vars={"start_timestamp": "2024-07-19 00:00:00", "end_timestamp": "2024-07-20 00:00:00"},
# )
# run_dbt_tests( # ok
# dataset_id="dashboard_subsidio_sppo",
# table_id="sumario_servico_dia_tipo_sem_glosa",
# _vars={"start_timestamp": "2024-07-19 00:00:00", "end_timestamp": "2024-07-19 00:00:00"},
# )
# run_dbt_tests( # ok
# dataset_id="dashboard_subsidio_sppo",
# table_id="sumario_servico_dia_tipo",
# _vars={"start_timestamp": "2024-07-19 00:00:00", "end_timestamp": "2024-07-19 00:00:00"},
# )


## Selector apuração ##
# run_command = """dbt run --selector apuracao_subsidio_v9 --vars "{'start_date': '2024-10-01', 'end_date': '2024-10-05'}" -x --profiles-dir ./dev --target hmg"""

# project_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
# os.chdir(project_dir)
# os.system(run_command)

# run_command = """dbt run --selector apuracao_subsidio_v9 --vars "{'start_date': '2024-10-06', 'end_date': '2024-10-06'}" -x --profiles-dir ./dev --target hmg"""

# project_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
# os.chdir(project_dir)
# os.system(run_command)
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ SELECT
FROM
servico_km_apuracao AS s
LEFT JOIN
{{ ref("valor_tipo_penalidade") }} AS p
-- `rj-smtr`.`dashboard_subsidio_sppo`.`valor_tipo_penalidade` AS p
-- {{ ref("valor_tipo_penalidade") }} AS p
`rj-smtr`.`dashboard_subsidio_sppo`.`valor_tipo_penalidade` AS p
ON
s.data BETWEEN p.data_inicio
AND p.data_fim
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ WITH
status,
SAFE_CAST(JSON_VALUE(indicadores,"$.indicador_ar_condicionado") AS BOOL) AS indicador_ar_condicionado
FROM
{{ ref("sppo_veiculo_dia") }}
-- rj-smtr.veiculo.sppo_veiculo_dia
-- {{ ref("sppo_veiculo_dia") }}
rj-smtr.veiculo.sppo_veiculo_dia
WHERE
data BETWEEN DATE("{{ var("start_date") }}")
AND DATE("{{ var("end_date") }}")
Expand Down
4 changes: 2 additions & 2 deletions queries/models/financeiro/subsidio_penalidade_servico_dia.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ WITH
perc_km_superior,
IFNULL(-valor, 0) AS valor_penalidade
FROM
{{ ref("valor_tipo_penalidade") }}
-- rj-smtr.dashboard_subsidio_sppo.valor_tipo_penalidade
-- {{ ref("valor_tipo_penalidade") }}
rj-smtr.dashboard_subsidio_sppo.valor_tipo_penalidade
)
SELECT
s.data,
Expand Down
6 changes: 3 additions & 3 deletions queries/models/gtfs/ordem_servico_trips_shapes_gtfs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ with
and (
(
o.tipo_dia = t.tipo_dia
and o.tipo_os not in ("CNU", "Eleição")
and o.tipo_os not in ("CNU", "Eleição", "Enem")
)
or (
o.tipo_dia = "Ponto Facultativo"
Expand Down Expand Up @@ -222,8 +222,8 @@ select
from ordem_servico_trips as o
left join shapes as s using (feed_version, feed_start_date, shape_id)
left join
{{ ref("ordem_servico_faixa_horaria") }} as fh
-- `rj-smtr.planejamento.ordem_servico_faixa_horaria` as fh
{{ ref("ordem_servico_faixa_horaria") }} AS fh
-- `rj-smtr.planejamento.ordem_servico_faixa_horaria` as fh
using (
feed_version, feed_start_date, tipo_os, tipo_dia, servico
)
Expand Down
Loading

0 comments on commit 51fc755

Please sign in to comment.