Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bilhetagem - Altera incremental strategy do modelo integracao.sql para insert_overwrite #375

Merged
merged 23 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b343bba
cria nova validação de integrações
pixuimpou Dec 23, 2024
64b6d66
add data_inicio_matriz
pixuimpou Dec 27, 2024
baadad8
cria tabela matriz_integracao com base na matriz da smtr
pixuimpou Dec 27, 2024
f3d1bf5
corrige integração com transação duplicada
pixuimpou Dec 27, 2024
141c079
cria validação das integrações usando a matriz da smtr
pixuimpou Dec 27, 2024
a681356
cria flow de materialização da nova validação dos dados da Jaé
pixuimpou Dec 27, 2024
554cdf2
schedule_cron opcional
pixuimpou Dec 27, 2024
c9495b8
cria materialização da matriz de integração da smtr
pixuimpou Dec 27, 2024
5acda00
add matriz_integracao_smtr / add validacao_dados_jae
pixuimpou Dec 27, 2024
2503c43
Merge branch 'main' into staging/jae-integracao-matriz-smtr
mergify[bot] Dec 27, 2024
1775ca3
add validacao_dados_jae
pixuimpou Dec 27, 2024
46d70ce
Merge branch 'staging/jae-integracao-matriz-smtr' of https://github.c…
pixuimpou Dec 27, 2024
655c68a
wait integracao e transacao
pixuimpou Dec 27, 2024
db5cb25
add change log pr 371
pixuimpou Dec 27, 2024
bef10dc
ajusta schedule
pixuimpou Dec 27, 2024
12b3a04
corrige mensagem de erro
pixuimpou Dec 30, 2024
27f4f36
descomenta refs
pixuimpou Dec 30, 2024
ea2c7d9
altera incremental strategy da integracao para insert_overwrite
pixuimpou Jan 2, 2025
60fce60
link pr
pixuimpou Jan 2, 2025
8105a76
atualiza dbt
pixuimpou Jan 2, 2025
85e61df
Merge branch 'main' into staging/jae-integracao-matriz-smtr
pixuimpou Jan 2, 2025
74294e3
corrige ref
pixuimpou Jan 2, 2025
ffe18e1
Merge branch 'staging/jae-integracao-matriz-smtr' of https://github.c…
pixuimpou Jan 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""
Flows for br_rj_riodejaneiro_bilhetagem

DBT: 2024-12-12
DBT: 2025-01-02
"""

from copy import deepcopy
Expand Down
6 changes: 6 additions & 0 deletions queries/models/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog - bilhetagem

## [2.0.3] - 2025-01-02

### Corrigido

- Altera incremental strategy do modelo `integracao.sql` para `insert_overwrite` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/375)

## [2.0.2] - 2024-12-30

### Removido
Expand Down
107 changes: 75 additions & 32 deletions queries/models/br_rj_riodejaneiro_bilhetagem/integracao.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,50 @@
{{
config(
materialized="incremental",
incremental_strategy="insert_overwrite",
partition_by={"field": "data", "data_type": "date", "granularity": "day"},
unique_key="id_transacao",
)
}}

{% set incremental_filter %}
date(data) between date("{{var('date_range_start')}}") and date(
"{{var('date_range_end')}}"
)
and timestamp_captura
between datetime("{{var('date_range_start')}}") and datetime(
"{{var('date_range_end')}}"
)
{% endset %}

{% set integracao_staging = ref("staging_integracao_transacao") %}

{% if execute %}
{% if is_incremental() %}
{% set integracao_partitions_query %}

SELECT DISTINCT
CONCAT("'", DATE(data_transacao), "'") AS data_transacao
FROM
{{ integracao_staging }},
UNNEST([
data_transacao_t0,
data_transacao_t1,
data_transacao_t2,
data_transacao_t3,
data_transacao_t4
]) AS data_transacao
WHERE
{{ incremental_filter }}

{% endset %}

{% set integracao_partitions = (
run_query(integracao_partitions_query).columns[0].values()
) %}

{% endif %}
{% endif %}

with
integracao_transacao_deduplicada as (
select * except (rn)
Expand All @@ -17,19 +56,9 @@ with
row_number() over (
partition by id order by timestamp_captura desc
) as rn
from
{{ ref("staging_integracao_transacao") }}
{# `rj-smtr.br_rj_riodejaneiro_bilhetagem_staging.integracao_transacao` #}
{% if is_incremental() -%}
where
date(data) between date("{{var('date_range_start')}}") and date(
"{{var('date_range_end')}}"
)
and timestamp_captura
between datetime("{{var('date_range_start')}}") and datetime(
"{{var('date_range_end')}}"
)
{%- endif %}
from {{ integracao_staging }}
{# `rj-smtr.br_rj_riodejaneiro_bilhetagem_staging.integracao_transacao` #}
{% if is_incremental() -%} where {{ incremental_filter }} {%- endif %}
)
where rn = 1
),
Expand Down Expand Up @@ -77,7 +106,7 @@ with
]
) as im
),
integracao_rn as (
integracao_new as (
select
i.data,
i.hora,
Expand Down Expand Up @@ -109,39 +138,53 @@ with
i.valor_transacao,
i.valor_transacao_total,
i.texto_adicional,
'{{ var("version") }}' as versao,
row_number() over (
partition by id_integracao, id_transacao
order by datetime_processamento_integracao desc
) as rn
'{{ var("version") }}' as versao
from integracao_melt i
left join
{{ source("cadastro", "modos") }} m
on i.id_tipo_modal = m.id_modo
and m.fonte = "jae"
left join
{{ ref("operadoras") }} do on i.id_operadora = do.id_operadora_jae
{# `rj-smtr.cadastro.operadoras` do on i.id_operadora = do.id_operadora_jae #}
left join
{{ ref("consorcios") }} dc on i.id_consorcio = dc.id_consorcio_jae
{# `rj-smtr.cadastro.consorcios` dc on i.id_consorcio = dc.id_consorcio_jae #}
left join {{ ref("operadoras") }} do on i.id_operadora = do.id_operadora_jae
{# `rj-smtr.cadastro.operadoras` do on i.id_operadora = do.id_operadora_jae #}
left join {{ ref("consorcios") }} dc on i.id_consorcio = dc.id_consorcio_jae
{# `rj-smtr.cadastro.consorcios` dc on i.id_consorcio = dc.id_consorcio_jae #}
left join
{{ ref("staging_linha") }} l
{# `rj-smtr.br_rj_riodejaneiro_bilhetagem_staging.linha` l #}
on i.id_linha = l.cd_linha
where i.id_transacao is not null
),
complete_partitions as (
select *, 0 as priority
from integracao_new

{% if is_incremental() %}
union all

select *, 1 as priority
from {{ this }}
where
{% if integracao_partitions | length > 0 %}
data in ({{ integracao_partitions | join(", ") }})
{% else %} data = "2000-01-01"
{% endif %}
{% endif %}
),
integracoes_teste_invalidas as (
select distinct i.id_integracao
from integracao_rn i
from complete_partitions i
left join
{{ ref("staging_linha_sem_ressarcimento") }} l
{# `rj-smtr.br_rj_riodejaneiro_bilhetagem_staging.linha_sem_ressarcimento` l #}
on i.id_servico_jae = l.id_linha
where l.id_linha is not null or i.data < "2023-07-17"
)
select * except (rn)
from integracao_rn
where
rn = 1
and id_integracao not in (select id_integracao from integracoes_teste_invalidas)
select * except (priority)
from complete_partitions
where id_integracao not in (select id_integracao from integracoes_teste_invalidas)
qualify
row_number() over (
partition by id_integracao, id_transacao, priority
order by datetime_processamento_integracao desc
)
= 1
Loading