Skip to content

Commit

Permalink
Merge branch 'staging/transacao-2-0' of https://github.com/prefeitura…
Browse files Browse the repository at this point in the history
…-rio/pipelines_rj_smtr into staging/transacao-2-0
  • Loading branch information
pixuimpou committed Jan 8, 2025
2 parents 6904e39 + 2da6a72 commit 83f79fd
Show file tree
Hide file tree
Showing 18 changed files with 563 additions and 367 deletions.
2 changes: 1 addition & 1 deletion pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""
Flows for br_rj_riodejaneiro_bilhetagem
DBT: 2024-12-12
DBT: 2025-01-06
"""

from copy import deepcopy
Expand Down
15 changes: 15 additions & 0 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
# Changelog - gtfs

## [1.2.0] - 2025-01-03

### Adicionado
- Adicionado schedule de 5 minutos do flow de captura do gtfs (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/379)

- Adicionado parâmetros personalizados de execução no arquivo `flows.py` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/379)

### Removido
- Removido o teste de quantidade de abas na planilha da Ordem de Serviço (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/379)

## [1.1.9] - 2025-01-02

### Alterado
- Remove teste de verificação de quilometragem da task `processa_ordem_servico` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/377)

## [1.1.8] - 2024-12-30

### Alterado
Expand Down
9 changes: 6 additions & 3 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@
upload_raw_data_to_gcs,
upload_staging_data_to_gcs,
)

# from pipelines.schedules import every_5_minutes
from pipelines.schedules import every_5_minutes
from pipelines.tasks import (
check_fail,
get_scheduled_timestamp,
Expand Down Expand Up @@ -287,13 +286,17 @@
gtfs_captura_nova.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[constants.RJ_SMTR_AGENT_LABEL.value],
cpu_limit="1000m",
memory_limit="4600Mi",
cpu_request="500m",
memory_request="1000Mi",
)
gtfs_captura_nova.state_handlers = [
handler_inject_bd_credentials,
handler_initialize_sentry,
handler_skip_if_running,
]
# gtfs_captura_nova.schedule = every_5_minutes
gtfs_captura_nova.schedule = every_5_minutes


# with Flow(
Expand Down
35 changes: 20 additions & 15 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,13 @@ def processa_ordem_servico(
None
"""

if len(sheetnames) != 3 and regular_sheet_index is None:
raise Exception("More than 3 tabs in the file. Please specify the regular sheet index.")
if (
len([sheet for sheet in sheetnames if "ANEXO I:" in sheet]) != 1
and regular_sheet_index is None
):
raise Exception(
"More than 1 regular sheet in the file. Please specify the regular sheet index."
)

if regular_sheet_index is None:
regular_sheet_index = next(
Expand Down Expand Up @@ -307,19 +312,19 @@ def processa_ordem_servico(
if not all_columns_present or not no_duplicate_columns:
raise Exception("Missing or duplicated columns in ordem_servico")

quadro_test = quadro_geral.copy()
quadro_test["km_test"] = round(
(quadro_geral["partidas_volta_du"] * quadro_geral["extensao_volta"])
+ (quadro_geral["partidas_ida_du"] * quadro_geral["extensao_ida"]),
2,
)
quadro_test["dif"] = quadro_test["km_test"] - quadro_test["km_dia_util"]

if not (
round(abs(quadro_test["dif"].max()), 2) <= 0.01
and round(abs(quadro_test["dif"].min()), 2) <= 0.01
):
raise Exception("failed to validate km_test and km_dia_util")
# quadro_test = quadro_geral.copy()
# quadro_test["km_test"] = round(
# (quadro_geral["partidas_volta_du"] * quadro_geral["extensao_volta"])
# + (quadro_geral["partidas_ida_du"] * quadro_geral["extensao_ida"]),
# 2,
# )
# quadro_test["dif"] = quadro_test["km_test"] - quadro_test["km_dia_util"]

# if not (
# round(abs(quadro_test["dif"].max()), 2) <= 0.01
# and round(abs(quadro_test["dif"].min()), 2) <= 0.01
# ):
# raise Exception("failed to validate km_test and km_dia_util")

local_file_path = list(filter(lambda x: "ordem_servico" in x, local_filepath))[0]
quadro_geral_csv = quadro_geral.to_csv(index=False)
Expand Down
6 changes: 6 additions & 0 deletions pipelines/migration/projeto_subsidio_sppo/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog - projeto_subsidio_sppo

## [1.0.7] - 2025-01-03

### Adicionado

- Adiciona a materialização dos modelos do dataset `monitoramento` ao flow do subsídio (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/372)

## [1.0.6] - 2024-12-17

### Adicionado
Expand Down
20 changes: 19 additions & 1 deletion pipelines/migration/projeto_subsidio_sppo/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
Flows for projeto_subsidio_sppo
DBT: 2024-12-20
DBT: 2025-01-06
"""

from prefect import Parameter, case, task
Expand Down Expand Up @@ -275,6 +275,18 @@
_vars=dbt_vars_2,
upstream_tasks=[dbt_vars_2],
)
dbt_vars_3 = get_join_dict(
dict_list=[dbt_vars_1],
new_dict=date_intervals["second_range"]
| {"tipo_materializacao": "monitoramento"},
upstream_tasks=[SUBSIDIO_SPPO_APURACAO_RUN_2],
)[0]

SUBSIDIO_SPPO_APURACAO_RUN_3 = run_dbt_selector(
selector_name="monitoramento_subsidio",
_vars=dbt_vars_3,
upstream_tasks=[dbt_vars_3],
)

# POST-DATA QUALITY CHECK #
SUBSIDIO_SPPO_DATA_QUALITY_POS_2 = run_dbt_tests(
Expand Down Expand Up @@ -323,6 +335,11 @@
_vars=_vars,
)

SUBSIDIO_SPPO_APURACAO_RUN_2 = run_dbt_selector(
selector_name="monitoramento_subsidio",
_vars=_vars | {"tipo_materializacao": "monitoramento"},
upstream_tasks=[SUBSIDIO_SPPO_APURACAO_RUN],
)
# POST-DATA QUALITY CHECK #
SUBSIDIO_SPPO_DATA_QUALITY_POS = run_dbt_tests(
dataset_id="viagens_remuneradas sumario_servico_dia_pagamento",
Expand Down Expand Up @@ -373,6 +390,7 @@
dataset_id="sppo_registros sppo_realocacao check_gps_treatment__gps_sppo sppo_veiculo_dia", # noqa
_vars=dbt_vars,
)

DATA_QUALITY_PRE = dbt_data_quality_checks(
dbt_logs=SUBSIDIO_SPPO_DATA_QUALITY_PRE,
checks_list={},
Expand Down
1 change: 1 addition & 0 deletions queries/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ vars:
subsidio_parametros: "`rj-smtr-staging.dashboard_subsidio_sppo_staging.subsidio_parametros`"
shapes_version: "YYYY-MM-DD"
frequencies_version: "YYYY-MM-DD"
tipo_materializacao: "subsidio"
# Feature penalidade de autuação por inoperância do ar condicionado (DECRETO RIO 51940/2023)
DATA_SUBSIDIO_V2_INICIO: "2023-01-16"
# Feature penalidade de autuação por segurança e limpeza/equipamento (DECRETO RIO 52820/2023)
Expand Down
6 changes: 6 additions & 0 deletions queries/models/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog - bilhetagem

## [2.0.3] - 2025-01-02

### Corrigido

- Altera incremental strategy do modelo `integracao.sql` para `insert_overwrite` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/375)

## [2.0.2] - 2024-12-30

### Removido
Expand Down
114 changes: 82 additions & 32 deletions queries/models/br_rj_riodejaneiro_bilhetagem/integracao.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,57 @@
{{
config(
materialized="incremental",
incremental_strategy="insert_overwrite",
partition_by={"field": "data", "data_type": "date", "granularity": "day"},
unique_key="id_transacao",
)
}}

{% set incremental_filter %}
date(data) between date("{{var('date_range_start')}}") and date(
"{{var('date_range_end')}}"
)
and timestamp_captura
between datetime("{{var('date_range_start')}}") and datetime(
"{{var('date_range_end')}}"
)
{% endset %}

{% set integracao_staging = ref("staging_integracao_transacao") %}

{% if execute %}
{% if is_incremental() %}
{% set integracao_partitions_query %}
WITH integracao AS (
SELECT DISTINCT
CONCAT("'", DATE(data_transacao), "'") AS data_transacao
FROM
{{ integracao_staging }},
UNNEST([
data_transacao_t0,
data_transacao_t1,
data_transacao_t2,
data_transacao_t3,
data_transacao_t4
]) AS data_transacao
WHERE
{{ incremental_filter }}
)
SELECT
*
FROM
integracao
WHERE
data_transacao IS NOT NULL

{% endset %}

{% set integracao_partitions = (
run_query(integracao_partitions_query).columns[0].values()
) %}

{% endif %}
{% endif %}

with
integracao_transacao_deduplicada as (
select * except (rn)
Expand All @@ -17,19 +63,9 @@ with
row_number() over (
partition by id order by timestamp_captura desc
) as rn
from
{{ ref("staging_integracao_transacao") }}
{# `rj-smtr.br_rj_riodejaneiro_bilhetagem_staging.integracao_transacao` #}
{% if is_incremental() -%}
where
date(data) between date("{{var('date_range_start')}}") and date(
"{{var('date_range_end')}}"
)
and timestamp_captura
between datetime("{{var('date_range_start')}}") and datetime(
"{{var('date_range_end')}}"
)
{%- endif %}
from {{ integracao_staging }}
{# `rj-smtr.br_rj_riodejaneiro_bilhetagem_staging.integracao_transacao` #}
{% if is_incremental() -%} where {{ incremental_filter }} {%- endif %}
)
where rn = 1
),
Expand Down Expand Up @@ -77,7 +113,7 @@ with
]
) as im
),
integracao_rn as (
integracao_new as (
select
i.data,
i.hora,
Expand Down Expand Up @@ -109,39 +145,53 @@ with
i.valor_transacao,
i.valor_transacao_total,
i.texto_adicional,
'{{ var("version") }}' as versao,
row_number() over (
partition by id_integracao, id_transacao
order by datetime_processamento_integracao desc
) as rn
'{{ var("version") }}' as versao
from integracao_melt i
left join
{{ source("cadastro", "modos") }} m
on i.id_tipo_modal = m.id_modo
and m.fonte = "jae"
left join
{{ ref("operadoras") }} do on i.id_operadora = do.id_operadora_jae
{# `rj-smtr.cadastro.operadoras` do on i.id_operadora = do.id_operadora_jae #}
left join
{{ ref("consorcios") }} dc on i.id_consorcio = dc.id_consorcio_jae
{# `rj-smtr.cadastro.consorcios` dc on i.id_consorcio = dc.id_consorcio_jae #}
left join {{ ref("operadoras") }} do on i.id_operadora = do.id_operadora_jae
{# `rj-smtr.cadastro.operadoras` do on i.id_operadora = do.id_operadora_jae #}
left join {{ ref("consorcios") }} dc on i.id_consorcio = dc.id_consorcio_jae
{# `rj-smtr.cadastro.consorcios` dc on i.id_consorcio = dc.id_consorcio_jae #}
left join
{{ ref("staging_linha") }} l
{# `rj-smtr.br_rj_riodejaneiro_bilhetagem_staging.linha` l #}
on i.id_linha = l.cd_linha
where i.id_transacao is not null
),
complete_partitions as (
select *, 0 as priority
from integracao_new

{% if is_incremental() %}
union all

select *, 1 as priority
from {{ this }}
where
{% if integracao_partitions | length > 0 %}
data in ({{ integracao_partitions | join(", ") }})
{% else %} data = "2000-01-01"
{% endif %}
{% endif %}
),
integracoes_teste_invalidas as (
select distinct i.id_integracao
from integracao_rn i
from complete_partitions i
left join
{{ ref("staging_linha_sem_ressarcimento") }} l
{# `rj-smtr.br_rj_riodejaneiro_bilhetagem_staging.linha_sem_ressarcimento` l #}
on i.id_servico_jae = l.id_linha
where l.id_linha is not null or i.data < "2023-07-17"
)
select * except (rn)
from integracao_rn
where
rn = 1
and id_integracao not in (select id_integracao from integracoes_teste_invalidas)
select * except (priority)
from complete_partitions
where id_integracao not in (select id_integracao from integracoes_teste_invalidas)
qualify
row_number() over (
partition by id_integracao, id_transacao, priority
order by datetime_processamento_integracao desc
)
= 1
5 changes: 5 additions & 0 deletions queries/models/monitoramento/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog - monitoramento

## [1.2.1] - 2025-01-03

### Adicionado
- Cria modelo `monitoramento_viagem_transacao.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/372)

## [1.2.0] - 2024-11-28

### Adicionado
Expand Down
26 changes: 26 additions & 0 deletions queries/models/monitoramento/monitoramento_viagem_transacao.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{{
config(
materialized="incremental",
partition_by={"field": "data", "data_type": "date", "granularity": "day"},
incremental_strategy="insert_overwrite",
)
}}

select
data,
id_viagem,
id_veiculo,
id_validador,
servico,
tipo_viagem,
sentido,
distancia_planejada,
quantidade_transacao,
quantidade_transacao_riocard,
percentual_estado_equipamento_aberto,
indicador_estado_equipamento_aberto,
datetime_partida_bilhetagem,
datetime_partida,
datetime_chegada,
datetime_ultima_atualizacao
from {{ ref('viagem_transacao_aux') }}
Loading

0 comments on commit 83f79fd

Please sign in to comment.