From ee19094a700d694aa76c905e9797a4af871c35cf Mon Sep 17 00:00:00 2001 From: Rafael Carvalho Pinheiro <74972217+pixuimpou@users.noreply.github.com> Date: Mon, 30 Dec 2024 13:20:23 -0300 Subject: [PATCH] =?UTF-8?q?Cria=20valida=C3=A7=C3=A3o=20dos=20dados=20de?= =?UTF-8?q?=20integra=C3=A7=C3=A3o=20da=20Ja=C3=A9=20com=20base=20na=20Mat?= =?UTF-8?q?riz=20da=20SMTR=20(#371)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * cria nova validação de integrações * add data_inicio_matriz * cria tabela matriz_integracao com base na matriz da smtr * corrige integração com transação duplicada * cria validação das integrações usando a matriz da smtr * cria flow de materialização da nova validação dos dados da Jaé * schedule_cron opcional * cria materialização da matriz de integração da smtr * add matriz_integracao_smtr / add validacao_dados_jae * add validacao_dados_jae * wait integracao e transacao * add change log pr 371 * ajusta schedule * corrige mensagem de erro * descomenta refs --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- pipelines/flows.py | 1 + .../CHANGELOG.md | 5 + .../br_rj_riodejaneiro_bilhetagem/flows.py | 5 +- .../br_rj_riodejaneiro_gtfs/CHANGELOG.md | 4 + .../br_rj_riodejaneiro_gtfs/flows.py | 3 +- pipelines/treatment/datario/CHANGELOG.md | 6 + pipelines/treatment/datario/constants.py | 2 - pipelines/treatment/planejamento/CHANGELOG.md | 6 + pipelines/treatment/planejamento/constants.py | 5 + pipelines/treatment/planejamento/flows.py | 7 + pipelines/treatment/templates/utils.py | 6 +- .../validacao_dados_jae/CHANGELOG.md | 7 + .../treatment/validacao_dados_jae/__init__.py | 0 .../validacao_dados_jae/constants.py | 22 + .../treatment/validacao_dados_jae/flows.py | 46 ++ .../CHANGELOG.md | 9 + .../integracao.sql | 285 ++++++----- .../matriz_integracao.sql | 46 -- .../br_rj_riodejaneiro_bilhetagem/schema.yml | 19 +- .../transacao.sql | 6 + queries/models/docs.md | 4 + queries/models/planejamento/CHANGELOG.md | 6 + .../models/planejamento/matriz_integracao.sql | 111 +++++ queries/models/planejamento/schema.yml | 42 ++ queries/models/sources.yml | 5 + .../models/validacao_dados_jae/CHANGELOG.md | 7 + .../integracao_invalida.sql | 188 +++++--- .../integracao_nao_realizada.sql | 446 +++++++++--------- queries/models/validacao_dados_jae/schema.yml | 51 +- queries/selectors.yml | 17 +- 30 files changed, 866 insertions(+), 501 deletions(-) create mode 100644 pipelines/treatment/validacao_dados_jae/CHANGELOG.md create mode 100644 pipelines/treatment/validacao_dados_jae/__init__.py create mode 100644 pipelines/treatment/validacao_dados_jae/constants.py create mode 100644 pipelines/treatment/validacao_dados_jae/flows.py delete mode 100644 queries/models/br_rj_riodejaneiro_bilhetagem/matriz_integracao.sql create mode 100644 queries/models/planejamento/matriz_integracao.sql diff --git a/pipelines/flows.py b/pipelines/flows.py index 7f9448767..f422d1cf1 100644 --- a/pipelines/flows.py +++ b/pipelines/flows.py @@ -26,3 +26,4 @@ from pipelines.treatment.datario.flows import * # noqa from pipelines.treatment.monitoramento.flows import * # noqa from pipelines.treatment.planejamento.flows import * # noqa +from pipelines.treatment.validacao_dados_jae.flows import * # noqa diff --git a/pipelines/migration/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md b/pipelines/migration/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md index eaa4262b9..288039edb 100644 --- a/pipelines/migration/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md +++ b/pipelines/migration/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog - br_rj_riodejaneiro_bilhetagem +## [1.4.9] - 2024-12-30 + +### Alterado +- Desativa schedule do flow `bilhetagem_validacao_jae` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371) + ## [1.4.8] - 2024-12-16 ### Alterado diff --git a/pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py index 4c60accc4..41e9d9cb1 100644 --- a/pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py @@ -33,10 +33,9 @@ rename_current_flow_run_now_time, ) from pipelines.migration.utils import set_default_parameters -from pipelines.schedules import ( +from pipelines.schedules import ( # every_day_hour_seven, every_5_minutes, every_day_hour_five, - every_day_hour_seven, every_hour, every_minute, ) @@ -354,7 +353,7 @@ handler_skip_if_running, ] -bilhetagem_validacao_jae.schedule = every_day_hour_seven +# bilhetagem_validacao_jae.schedule = every_day_hour_seven # RECAPTURA # diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md b/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md index 2e423bb3f..37b2f288f 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog - gtfs +## [1.1.8] - 2024-12-30 + +### Alterado +- Exclui modelo `matriz_integracao` da materialização (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371) ## [1.1.7] - 2024-12-13 diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py b/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py index a3e7f3838..68c53aa6e 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py @@ -238,7 +238,8 @@ + " " + constants.PLANEJAMENTO_MATERIALIZACAO_DATASET_ID.value, _vars=dbt_vars, - exclude="calendario aux_calendario_manual viagem_planejada_planejamento", + exclude="calendario aux_calendario_manual viagem_planejada_planejamento \ +matriz_integracao", ).set_upstream(task=wait_captura) run_dbt_failed = task_value_is_none(wait_run_dbt_model) diff --git a/pipelines/treatment/datario/CHANGELOG.md b/pipelines/treatment/datario/CHANGELOG.md index 200360ed6..93feacbc8 100644 --- a/pipelines/treatment/datario/CHANGELOG.md +++ b/pipelines/treatment/datario/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog - datario +## [1.0.1] - 2024-12-30 + +### Alterado + +- Remove parâmetro `schedule_cron` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371) + ## [1.0.0] - 2024-12-16 ### Adicionado diff --git a/pipelines/treatment/datario/constants.py b/pipelines/treatment/datario/constants.py index 17824f115..d11d352b6 100644 --- a/pipelines/treatment/datario/constants.py +++ b/pipelines/treatment/datario/constants.py @@ -6,7 +6,6 @@ from datetime import datetime from enum import Enum -from pipelines.schedules import create_daily_cron from pipelines.treatment.templates.utils import DBTSelector @@ -17,6 +16,5 @@ class constants(Enum): # pylint: disable=c0103 DATARIO_SELECTOR = DBTSelector( name="datario", - schedule_cron=create_daily_cron(hour=0), initial_datetime=datetime(2024, 12, 16), ) diff --git a/pipelines/treatment/planejamento/CHANGELOG.md b/pipelines/treatment/planejamento/CHANGELOG.md index 9d467be29..d5a9e42a5 100644 --- a/pipelines/treatment/planejamento/CHANGELOG.md +++ b/pipelines/treatment/planejamento/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog - planejamento +## [1.1.0] - 2024-12-30 + +### Adicionado + +- Cria flow de materialização da matriz de integração da SMTR (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371) + ## [1.0.1] - 2024-11-25 ### Alterado diff --git a/pipelines/treatment/planejamento/constants.py b/pipelines/treatment/planejamento/constants.py index d83a73334..8985b0241 100644 --- a/pipelines/treatment/planejamento/constants.py +++ b/pipelines/treatment/planejamento/constants.py @@ -20,3 +20,8 @@ class constants(Enum): # pylint: disable=c0103 schedule_cron=create_daily_cron(hour=1), initial_datetime=datetime(2024, 9, 1, 0, 0, 0), ) + + MATRIZ_INTEGRACAO_SMTR_SELECTOR = DBTSelector( + name="matriz_integracao_smtr", + initial_datetime=datetime(2024, 12, 30, 0, 0, 0), + ) diff --git a/pipelines/treatment/planejamento/flows.py b/pipelines/treatment/planejamento/flows.py index 82922bedc..55971598f 100644 --- a/pipelines/treatment/planejamento/flows.py +++ b/pipelines/treatment/planejamento/flows.py @@ -14,3 +14,10 @@ selector=constants.PLANEJAMENTO_DIARIO_SELECTOR.value, agent_label=smtr_constants.RJ_SMTR_AGENT_LABEL.value, ) + +MATRIZ_INTEGRACAO_SMTR_MATERIALIZACAO = create_default_materialization_flow( + flow_name="matriz_integracao_smtr - materializacao", + selector=constants.MATRIZ_INTEGRACAO_SMTR_SELECTOR.value, + agent_label=smtr_constants.RJ_SMTR_AGENT_LABEL.value, + generate_schedule=False, +) diff --git a/pipelines/treatment/templates/utils.py b/pipelines/treatment/templates/utils.py index 4e130e5a4..8f61baf1f 100644 --- a/pipelines/treatment/templates/utils.py +++ b/pipelines/treatment/templates/utils.py @@ -39,8 +39,8 @@ class DBTSelector: def __init__( self, name: str, - schedule_cron: str, initial_datetime: datetime, + schedule_cron: str = None, incremental_delay_hours: int = 0, ): self.name = name @@ -98,6 +98,8 @@ def is_up_to_date(self, env: str, timestamp: datetime) -> bool: Returns: bool: se está atualizado ou não """ + if self.schedule_cron is None: + raise ValueError("O selector não possui agendamento") last_materialization = self.get_last_materialized_datetime(env=env) last_schedule = cron_get_last_date(cron_expr=self.schedule_cron, timestamp=timestamp) @@ -115,6 +117,8 @@ def get_next_schedule_datetime(self, timestamp: datetime) -> datetime: Returns: datetime: próximo datetime do cron """ + if self.schedule_cron is None: + raise ValueError("O selector não possui agendamento") return cron_get_next_date(cron_expr=self.schedule_cron, timestamp=timestamp) def set_redis_materialized_datetime(self, env: str, timestamp: datetime): diff --git a/pipelines/treatment/validacao_dados_jae/CHANGELOG.md b/pipelines/treatment/validacao_dados_jae/CHANGELOG.md new file mode 100644 index 000000000..1d59edb6c --- /dev/null +++ b/pipelines/treatment/validacao_dados_jae/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog - validacao_dados_jae + +## [1.0.0] - 2024-12-30 + +### Adicionado + +- Cria flow de materialização da validação dos dados da Jaé (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371) \ No newline at end of file diff --git a/pipelines/treatment/validacao_dados_jae/__init__.py b/pipelines/treatment/validacao_dados_jae/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pipelines/treatment/validacao_dados_jae/constants.py b/pipelines/treatment/validacao_dados_jae/constants.py new file mode 100644 index 000000000..f6032b087 --- /dev/null +++ b/pipelines/treatment/validacao_dados_jae/constants.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +""" +Valores constantes para materialização da validação dos dados da Jaé +""" + +from datetime import datetime +from enum import Enum + +from pipelines.schedules import create_daily_cron +from pipelines.treatment.templates.utils import DBTSelector + + +class constants(Enum): # pylint: disable=c0103 + """ + Valores constantes para materialização da validação dos dados da Jaé + """ + + VALIDACAO_DADOS_JAE_SELECTOR = DBTSelector( + name="validacao_dados_jae", + schedule_cron=create_daily_cron(hour=7), + initial_datetime=datetime(2024, 12, 30, 0, 0, 0), + ) diff --git a/pipelines/treatment/validacao_dados_jae/flows.py b/pipelines/treatment/validacao_dados_jae/flows.py new file mode 100644 index 000000000..154c97775 --- /dev/null +++ b/pipelines/treatment/validacao_dados_jae/flows.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +""" +Flows de tratamento dos dados da validação dos dados da Jaé +""" + +from pipelines.constants import constants as smtr_constants +from pipelines.migration.br_rj_riodejaneiro_bilhetagem.constants import ( + constants as bilhetagem_constants, +) +from pipelines.schedules import create_daily_cron, create_hourly_cron +from pipelines.treatment.templates.flows import create_default_materialization_flow +from pipelines.treatment.validacao_dados_jae.constants import constants + +transacao_materializacao_params = ( + bilhetagem_constants.BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS.value +) + +integracao_materializacao_params = ( + bilhetagem_constants.BILHETAGEM_MATERIALIZACAO_INTEGRACAO_PARAMS.value +) + +VALIDACAO_DADOS_JAE_MATERIALIZACAO = create_default_materialization_flow( + flow_name="validacao_dados_jae - materializacao", + selector=constants.VALIDACAO_DADOS_JAE_SELECTOR.value, + agent_label=smtr_constants.RJ_SMTR_AGENT_LABEL.value, + wait=[ + { + "redis_key": f"{integracao_materializacao_params['dataset_id']}\ +.{integracao_materializacao_params['table_id']}", + "dict_key": "last_run_timestamp", + "datetime_format": "%Y-%m-%dT%H:%M:%S", + "delay_hours": integracao_materializacao_params["dbt_vars"]["date_range"][ + "delay_hours" + ], + "schedule_cron": create_daily_cron(hour=5), + }, + { + "redis_key": f"{transacao_materializacao_params['dataset_id']}\ +.{transacao_materializacao_params['table_id']}", + "dict_key": "last_run_timestamp", + "datetime_format": "%Y-%m-%dT%H:%M:%S", + "delay_hours": transacao_materializacao_params["dbt_vars"]["date_range"]["delay_hours"], + "schedule_cron": create_hourly_cron(), + }, + ], +) diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md b/queries/models/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md index 64ffecb31..5e365f462 100644 --- a/queries/models/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md +++ b/queries/models/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md @@ -1,5 +1,14 @@ # Changelog - bilhetagem +## [2.0.2] - 2024-12-30 + +### Removido +- Move `matriz_integracao.sql` para o planejamento (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371) + + +### Alterado +- Muda deduplicação de integração para considerar integrações diferentes que compartilham transações nos modelos `transacao.sql` e `integracao.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371) + ## [2.0.1] - 2024-12-12 ### Alterado diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem/integracao.sql b/queries/models/br_rj_riodejaneiro_bilhetagem/integracao.sql index 805e11071..fd41b4778 100644 --- a/queries/models/br_rj_riodejaneiro_bilhetagem/integracao.sql +++ b/queries/models/br_rj_riodejaneiro_bilhetagem/integracao.sql @@ -1,148 +1,147 @@ -- depends_on: {{ ref('matriz_integracao') }} {{ - config( - materialized="incremental", - partition_by={ - "field":"data", - "data_type":"date", - "granularity": "day" - }, - unique_key='id_transacao', - ) + config( + materialized="incremental", + partition_by={"field": "data", "data_type": "date", "granularity": "day"}, + unique_key="id_transacao", + ) }} -WITH integracao_transacao_deduplicada AS ( - SELECT - * EXCEPT(rn) - FROM - ( - SELECT - *, - ROW_NUMBER() OVER (PARTITION BY id ORDER BY timestamp_captura DESC) AS rn - FROM - {{ ref("staging_integracao_transacao") }} - {% if is_incremental() -%} - WHERE - DATE(data) BETWEEN DATE("{{var('date_range_start')}}") AND DATE("{{var('date_range_end')}}") - AND timestamp_captura BETWEEN DATETIME("{{var('date_range_start')}}") AND DATETIME("{{var('date_range_end')}}") - {%- endif %} - ) - WHERE +with + integracao_transacao_deduplicada as ( + select * except (rn) + from + ( + select + *, + row_number() over ( + partition by id order by timestamp_captura desc + ) as rn + from + {{ ref("staging_integracao_transacao") }} + {# `rj-smtr.br_rj_riodejaneiro_bilhetagem_staging.integracao_transacao` #} + {% if is_incremental() -%} + where + date(data) between date("{{var('date_range_start')}}") and date( + "{{var('date_range_end')}}" + ) + and timestamp_captura + between datetime("{{var('date_range_start')}}") and datetime( + "{{var('date_range_end')}}" + ) + {%- endif %} + ) + where rn = 1 + ), + integracao_melt as ( + select + extract(date from im.data_transacao) as data, + extract(hour from im.data_transacao) as hora, + i.data_inclusao as datetime_inclusao, + i.data_processamento as datetime_processamento_integracao, + i.timestamp_captura as datetime_captura, + i.id as id_integracao, + im.sequencia_integracao, + im.data_transacao as datetime_transacao, + im.id_tipo_modal, + im.id_consorcio, + im.id_operadora, + im.id_linha, + im.id_transacao, + im.sentido, + im.perc_rateio, + im.valor_rateio_compensacao, + im.valor_rateio, + im.valor_transacao, + i.valor_transacao_total, + i.tx_adicional as texto_adicional + from + integracao_transacao_deduplicada i, + -- Transforma colunas com os dados de cada transação da integração em + -- linhas diferentes + unnest( + [ + {% for n in range(var("quantidade_integracoes_max")) %} + struct( + {% for column, column_config in var( + "colunas_integracao" + ).items() %} + {% if column_config.select %} + {{ column }}_t{{ n }} as {{ column }}, + {% endif %} + {% endfor %} + {{ n + 1 }} as sequencia_integracao + ) + {% if not loop.last %},{% endif %} + {% endfor %} + ] + ) as im + ), + integracao_rn as ( + select + i.data, + i.hora, + i.datetime_processamento_integracao, + i.datetime_captura, + i.datetime_transacao, + timestamp_diff( + i.datetime_transacao, + lag(i.datetime_transacao) over ( + partition by i.id_integracao order by sequencia_integracao + ), + minute + ) as intervalo_integracao, + i.id_integracao, + i.sequencia_integracao, + m.modo, + dc.id_consorcio, + dc.consorcio, + do.id_operadora, + do.operadora, + i.id_linha as id_servico_jae, + l.nr_linha as servico_jae, + l.nm_linha as descricao_servico_jae, + i.id_transacao, + i.sentido, + i.perc_rateio as percentual_rateio, + i.valor_rateio_compensacao, + i.valor_rateio, + i.valor_transacao, + i.valor_transacao_total, + i.texto_adicional, + '{{ var("version") }}' as versao, + row_number() over ( + partition by id_integracao, id_transacao + order by datetime_processamento_integracao desc + ) as rn + from integracao_melt i + left join + {{ source("cadastro", "modos") }} m + on i.id_tipo_modal = m.id_modo + and m.fonte = "jae" + left join + {{ ref("operadoras") }} do on i.id_operadora = do.id_operadora_jae + {# `rj-smtr.cadastro.operadoras` do on i.id_operadora = do.id_operadora_jae #} + left join + {{ ref("consorcios") }} dc on i.id_consorcio = dc.id_consorcio_jae + {# `rj-smtr.cadastro.consorcios` dc on i.id_consorcio = dc.id_consorcio_jae #} + left join + {{ ref("staging_linha") }} l + {# `rj-smtr.br_rj_riodejaneiro_bilhetagem_staging.linha` l #} + on i.id_linha = l.cd_linha + where i.id_transacao is not null + ), + integracoes_teste_invalidas as ( + select distinct i.id_integracao + from integracao_rn i + left join + {{ ref("staging_linha_sem_ressarcimento") }} l + {# `rj-smtr.br_rj_riodejaneiro_bilhetagem_staging.linha_sem_ressarcimento` l #} + on i.id_servico_jae = l.id_linha + where l.id_linha is not null or i.data < "2023-07-17" + ) +select * except (rn) +from integracao_rn +where rn = 1 -), -integracao_melt AS ( - SELECT - EXTRACT(DATE FROM im.data_transacao) AS data, - EXTRACT(HOUR FROM im.data_transacao) AS hora, - i.data_inclusao AS datetime_inclusao, - i.data_processamento AS datetime_processamento_integracao, - i.timestamp_captura AS datetime_captura, - i.id AS id_integracao, - im.sequencia_integracao, - im.data_transacao AS datetime_transacao, - im.id_tipo_modal, - im.id_consorcio, - im.id_operadora, - im.id_linha, - im.id_transacao, - im.sentido, - im.perc_rateio, - im.valor_rateio_compensacao, - im.valor_rateio, - im.valor_transacao, - i.valor_transacao_total, - i.tx_adicional AS texto_adicional - FROM - integracao_transacao_deduplicada i, - -- Transforma colunas com os dados de cada transação da integração em linhas diferentes - UNNEST( - [ - {% for n in range(var('quantidade_integracoes_max')) %} - STRUCT( - {% for column, column_config in var('colunas_integracao').items() %} - {% if column_config.select %} - {{ column }}_t{{ n }} AS {{ column }}, - {% endif %} - {% endfor %} - {{ n + 1 }} AS sequencia_integracao - ){% if not loop.last %},{% endif %} - {% endfor %} - ] - ) AS im -), -integracao_rn AS ( - SELECT - i.data, - i.hora, - i.datetime_processamento_integracao, - i.datetime_captura, - i.datetime_transacao, - TIMESTAMP_DIFF( - i.datetime_transacao, - LAG(i.datetime_transacao) OVER(PARTITION BY i.id_integracao ORDER BY sequencia_integracao), - MINUTE - ) AS intervalo_integracao, - i.id_integracao, - i.sequencia_integracao, - m.modo, - dc.id_consorcio, - dc.consorcio, - do.id_operadora, - do.operadora, - i.id_linha AS id_servico_jae, - -- s.servico, - l.nr_linha AS servico_jae, - l.nm_linha AS descricao_servico_jae, - i.id_transacao, - i.sentido, - i.perc_rateio AS percentual_rateio, - i.valor_rateio_compensacao, - i.valor_rateio, - i.valor_transacao, - i.valor_transacao_total, - i.texto_adicional, - '{{ var("version") }}' as versao, - ROW_NUMBER() OVER (PARTITION BY id_transacao ORDER BY datetime_processamento_integracao DESC) AS rn - FROM - integracao_melt i - LEFT JOIN - {{ source("cadastro", "modos") }} m - ON - i.id_tipo_modal = m.id_modo AND m.fonte = "jae" - LEFT JOIN - {{ ref("operadoras") }} AS do - ON - i.id_operadora = do.id_operadora_jae - LEFT JOIN - {{ ref("consorcios") }} AS dc - ON - i.id_consorcio = dc.id_consorcio_jae - LEFT JOIN - {{ ref("staging_linha") }} AS l - ON - i.id_linha = l.cd_linha - -- LEFT JOIN - -- {{ ref("servicos") }} AS s - -- ON - -- i.id_linha = s.id_servico_jae - WHERE i.id_transacao IS NOT NULL -), -integracoes_teste_invalidas AS ( - SELECT DISTINCT - i.id_integracao - FROM - integracao_rn i - LEFT JOIN - {{ ref("staging_linha_sem_ressarcimento") }} l - ON - i.id_servico_jae = l.id_linha - WHERE - l.id_linha IS NOT NULL - OR i.data < "2023-07-17" -) -SELECT - * EXCEPT(rn) -FROM - integracao_rn -WHERE rn = 1 -AND id_integracao NOT IN (SELECT id_integracao FROM integracoes_teste_invalidas) \ No newline at end of file + and id_integracao not in (select id_integracao from integracoes_teste_invalidas) diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem/matriz_integracao.sql b/queries/models/br_rj_riodejaneiro_bilhetagem/matriz_integracao.sql deleted file mode 100644 index 1f1f42fea..000000000 --- a/queries/models/br_rj_riodejaneiro_bilhetagem/matriz_integracao.sql +++ /dev/null @@ -1,46 +0,0 @@ -{{ - config( - materialized="table", - ) -}} - -WITH matriz_melt AS ( - SELECT - i.id, - im.sequencia_integracao, - im.id_tipo_modal, - im.perc_rateio, - i.dt_inicio_validade, - i.dt_fim_validade - FROM - {{ ref("staging_percentual_rateio_integracao") }} i, - -- Transforma colunas com os dados de cada modo da integração em linhas diferentes - UNNEST( - [ - {% for n in range(var('quantidade_integracoes_max')) %} - STRUCT( - {% for column in ['id_tipo_modal' , 'perc_rateio'] %} - {{ column }}{{ '_integracao_t'~n if n > 0 else '_origem' }} AS {{ column }}, - {% endfor %} - {{ n }} AS sequencia_integracao - ){% if not loop.last %},{% endif %} - {% endfor %} - ] - ) im -) -SELECT - i.id AS id_matriz_integracao, - i.sequencia_integracao, - m.modo, - i.perc_rateio AS percentual_rateio, - i.dt_inicio_validade AS data_inicio_validade, - i.dt_fim_validade AS data_fim_validade, - '{{ var("version") }}' as versao -FROM - matriz_melt i -LEFT JOIN - {{ source("cadastro", "modos") }} m -ON - i.id_tipo_modal = m.id_modo AND m.fonte = "jae" -WHERE - i.id_tipo_modal IS NOT NULL \ No newline at end of file diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem/schema.yml b/queries/models/br_rj_riodejaneiro_bilhetagem/schema.yml index 5ae100993..162b78e6e 100644 --- a/queries/models/br_rj_riodejaneiro_bilhetagem/schema.yml +++ b/queries/models/br_rj_riodejaneiro_bilhetagem/schema.yml @@ -145,23 +145,6 @@ models: description: "Observação adicional" - name: versao description: "Código de controle de versão do dado (SHA Github)" - - name: matriz_integracao - description: "Matriz de repartição tarifária de integrações válidas no sistema de transporte municipal" - columns: - - name: id_matriz_integracao - description: "Identificador único da integração" - - name: sequencia_integracao - description: "Sequencia da transação dentro da integração" - - name: modo - description: "Tipo de transporte (Ônibus, Van, BRT)" - - name: percentual_rateio - description: "Percentual de rateio do valor total para a operadora" - - name: data_inicio_validade - description: "Data de início da validade da matriz" - - name: data_fim_validade - description: "Data de fim da validade da matriz" - - name: versao - description: "Código de controle de versão do dado (SHA Github)" - name: gps_validador description: "Tabela de posições de GPS de todos os validadores da Jaé, incluindo estado do equipamento (Aberto/Fechado), serviço, sentido e veículo associado e temperatura do veículo." columns: @@ -599,4 +582,4 @@ models: - name: valor_transacao description: "Valor debitado na transação atual (R$)" - name: versao - description: "Código de controle de versão do dado (SHA Github)" \ No newline at end of file + description: "Código de controle de versão do dado (SHA Github)" diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem/transacao.sql b/queries/models/br_rj_riodejaneiro_bilhetagem/transacao.sql index 1c6573290..49ef7c41f 100644 --- a/queries/models/br_rj_riodejaneiro_bilhetagem/transacao.sql +++ b/queries/models/br_rj_riodejaneiro_bilhetagem/transacao.sql @@ -120,6 +120,12 @@ with {% else %} data = "2000-01-01" {% endif %} {% endif %} + qualify + row_number() over ( + partition by id_transacao + order by datetime_processamento_integracao desc + ) + = 1 ), transacao_ordem as ( select * diff --git a/queries/models/docs.md b/queries/models/docs.md index c36b4e7eb..e0a01c095 100644 --- a/queries/models/docs.md +++ b/queries/models/docs.md @@ -346,4 +346,8 @@ Indica que o valor encontra-se sob julgamento de ação judicial e será deposit {% docs tipo_os %} Tipo de Ordem de Serviço (ex: 'Regular', 'Extraordinária - Verão') +{% enddocs %} + +{% docs data_inicio_matriz %} +Data de inicio da versão da matriz de integração {% enddocs %} \ No newline at end of file diff --git a/queries/models/planejamento/CHANGELOG.md b/queries/models/planejamento/CHANGELOG.md index efa480149..879dcf80f 100644 --- a/queries/models/planejamento/CHANGELOG.md +++ b/queries/models/planejamento/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog - planejamento +## [1.3.0] - 2024-12-30 + +### Adicionado + +- Cria modelo `matriz_integracao.sql` com a matriz publicada pela SMTR (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371) + ## [1.2.0] - 2024-12-04 ### Adicionado diff --git a/queries/models/planejamento/matriz_integracao.sql b/queries/models/planejamento/matriz_integracao.sql new file mode 100644 index 000000000..9e0d0e992 --- /dev/null +++ b/queries/models/planejamento/matriz_integracao.sql @@ -0,0 +1,111 @@ +{{ + config( + materialized="table", + partition_by={ + "field": "data_inicio_matriz", + "data_type": "date", + "granularity": "day", + }, + ) +}} + +with + matriz_staging as ( + select + date(data_versao_matriz) as data_versao_matriz, + id_tipo_integracao, + primeira_perna, + cast( + if( + trim(porcentagem_primeira_perna) = '', + null, + porcentagem_primeira_perna + ) as numeric + ) + / 100 as porcentagem_primeira_perna, + segunda_perna, + cast( + if( + trim(porcentagem_segunda_perna) = '', + null, + porcentagem_segunda_perna + ) as numeric + ) + / 100 as porcentagem_segunda_perna, + terceira_perna, + cast( + if( + trim(porcentagem_terceira_perna) = '', + null, + porcentagem_terceira_perna + ) as numeric + ) + / 100 as porcentagem_terceira_perna, + cast(tempo_integracao_minutos as float64) as tempo_integracao_minutos + from {{ source("source_smtr", "matriz_integracao") }} + ), + data_versao as ( + select distinct data_versao_matriz as data_inicio_matriz from matriz_staging + ), + data_fim as ( + select + data_inicio_matriz, + date_sub( + lead(data_inicio_matriz) over (order by data_inicio_matriz), + interval 1 day + ) as data_fim_matriz + from data_versao + ), + matriz as ( + select + mi.data_versao_matriz as data_inicio_matriz, + mi.id_tipo_integracao as id_matriz_integracao, + p.sequencia_integracao, + if(trim(p.modo) = '', null, p.modo) as modo, + p.percentual_rateio, + case + when mi.terceira_perna is not null and trim(mi.terceira_perna) != '' + then [mi.primeira_perna, mi.segunda_perna, mi.terceira_perna] + else [mi.primeira_perna, mi.segunda_perna] + end as sequencia_completa_modo, + case + when mi.porcentagem_terceira_perna is not null + then + [ + mi.porcentagem_primeira_perna, + mi.porcentagem_segunda_perna, + mi.porcentagem_terceira_perna + ] + else [mi.porcentagem_primeira_perna, mi.porcentagem_segunda_perna] + end as sequencia_completa_rateio, + mi.tempo_integracao_minutos + from + matriz_staging mi, + unnest( + [ + struct( + primeira_perna as modo, + porcentagem_primeira_perna as percentual_rateio, + 1 as sequencia_integracao + ), + struct( + segunda_perna as modo, + porcentagem_segunda_perna as percentual_rateio, + 2 as sequencia_integracao + ), + struct( + terceira_perna as modo, + porcentagem_terceira_perna as percentual_rateio, + 3 as sequencia_integracao + ) + ] + ) p + ) +select + data_inicio_matriz, + d.data_fim_matriz, + m.* except (data_inicio_matriz), + '{{ var("version") }}' as versao +from matriz m +join data_fim d using (data_inicio_matriz) +where modo is not null diff --git a/queries/models/planejamento/schema.yml b/queries/models/planejamento/schema.yml index 00fed4a53..bcc48243b 100644 --- a/queries/models/planejamento/schema.yml +++ b/queries/models/planejamento/schema.yml @@ -269,3 +269,45 @@ models: description: "{{ doc('datetime_ultima_atualizacao') }}" data_type: datetime quote: true + - name: matriz_integracao + description: "Matriz de repartição tarifária de integrações válidas no sistema de transporte municipal" + columns: + - name: id_matriz_integracao + description: "Identificador único da integração" + data_type: string + - name: sequencia_integracao + description: "Sequencia da transação dentro da integração" + data_type: int64 + - name: modo + description: "Tipo de transporte (Ônibus, Van, BRT)" + data_type: string + - name: percentual_rateio + description: "Percentual de rateio do valor total para a operadora" + data_type: numeric + - name: data_inicio_validade + description: "Data de início da validade da matriz" + - name: data_fim_validade + description: "Data de fim da validade da matriz" + - name: versao + description: "{{ doc('versao') }}" + data_type: string + - name: data_inicio_matriz + description: "{{ doc('data_inicio_matriz') }}" + data_type: date + quote: true + - name: data_fim_matriz + description: Data de fim da versão da matriz de integração + data_type: date + quote: true + - name: sequencia_completa_modo + description: Lista com o conjunto de modos da integração completa + data_type: string + quote: true + - name: sequencia_completa_rateio + description: Lista com o conjunto de valores de rateio da integração completa + data_type: numeric + quote: true + - name: tempo_integracao_minutos + description: Tempo máximo entre a primeira e a última perna para a integração ser realizada + data_type: float64 + quote: true \ No newline at end of file diff --git a/queries/models/sources.yml b/queries/models/sources.yml index 44fd47259..1047cb75b 100644 --- a/queries/models/sources.yml +++ b/queries/models/sources.yml @@ -190,3 +190,8 @@ sources: tables: - name: viagem_informada + - name: source_smtr + database: rj-smtr-dev + + tables: + - name: matriz_integracao diff --git a/queries/models/validacao_dados_jae/CHANGELOG.md b/queries/models/validacao_dados_jae/CHANGELOG.md index e94e1a1a6..81e3f7e3a 100644 --- a/queries/models/validacao_dados_jae/CHANGELOG.md +++ b/queries/models/validacao_dados_jae/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog - validacao_dados_jae +## [2.0.0] - 2024-12-30 + +### Alterado + +- Altera modelos `integracao_nao_realizada.sql` e `integracao_invalida.sql` para considerar a matriz publicada pela SMTR (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371) +- Altera variável de data dos modelos `integracao_nao_realizada.sql` e `integracao_invalida.sql` de `run_date` para `date_range_[start/end]` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/371) + ## [1.1.3] - 2024-09-04 ### Alterado diff --git a/queries/models/validacao_dados_jae/integracao_invalida.sql b/queries/models/validacao_dados_jae/integracao_invalida.sql index 612b13d73..a251313f9 100644 --- a/queries/models/validacao_dados_jae/integracao_invalida.sql +++ b/queries/models/validacao_dados_jae/integracao_invalida.sql @@ -1,85 +1,135 @@ {{ - config( - incremental_strategy="insert_overwrite", - partition_by={ - "field": "data", - "data_type": "date", - "granularity": "day" - }, - ) + config( + incremental_strategy="insert_overwrite", + partition_by={"field": "data", "data_type": "date", "granularity": "day"}, + ) }} -{% set integracao_table = ref('integracao') %} +{% set integracao_table = ref("integracao") %} {% if execute %} - {% if is_incremental() %} + {% if is_incremental() %} - {% set partitions_query %} + {% set partitions_query %} SELECT CONCAT("'", PARSE_DATE("%Y%m%d", partition_id), "'") AS data FROM `{{ integracao_table.database }}.{{ integracao_table.schema }}.INFORMATION_SCHEMA.PARTITIONS` + {# `rj-smtr.br_rj_riodejaneiro_bilhetagem.INFORMATION_SCHEMA.PARTITIONS` #} WHERE table_name = "{{ integracao_table.identifier }}" AND partition_id != "__NULL__" - AND DATE(last_modified_time, "America/Sao_Paulo") = DATE_SUB(DATE("{{var('run_date')}}"), INTERVAL 1 DAY) - {% endset %} + AND DATE(last_modified_time, "America/Sao_Paulo") BETWEEN DATE("{{var('date_range_start')}}") AND DATE("{{var('date_range_end')}}") + {% endset %} - {{ log("Running query: \n"~partitions_query, info=True) }} - {% set partitions = run_query(partitions_query) %} + {{ log("Running query: \n" ~ partitions_query, info=True) }} + {% set partitions = run_query(partitions_query) %} - {% set partition_list = partitions.columns[0].values() %} - {{ log("integracao partitions: \n"~partition_list, info=True) }} - {% endif %} + {% set partition_list = partitions.columns[0].values() %} + {{ log("integracao partitions: \n" ~ partition_list, info=True) }} + {% endif %} {% endif %} -WITH sequencias_validas AS ( - SELECT - id_matriz_integracao, - STRING_AGG(modo, ', ' ORDER BY sequencia_integracao) AS modos - FROM - {{ ref("matriz_integracao") }} - GROUP BY - id_matriz_integracao -), -integracao_agg AS ( - SELECT - DATE(datetime_processamento_integracao) AS data, - id_integracao, - STRING_AGG(modo, ', ' ORDER BY sequencia_integracao) AS modos, - MIN(datetime_transacao) AS datetime_primeira_transacao, - MAX(datetime_transacao) AS datetime_ultima_transacao, - MIN(intervalo_integracao) AS menor_intervalo - FROM - {{ ref("integracao") }} - {% if is_incremental() %} - WHERE - {% if partition_list|length > 0 %} - data IN ({{ partition_list|join(', ') }}) - {% else %} - data = "2000-01-01" - {% endif %} - {% endif %} - GROUP BY - 1, - 2 -), -indicadores AS ( - SELECT - data, - id_integracao, - modos, - modos NOT IN (SELECT DISTINCT modos FROM sequencias_validas) AS indicador_fora_matriz, - TIMESTAMP_DIFF(datetime_ultima_transacao, datetime_primeira_transacao, MINUTE) > 180 AS indicador_tempo_integracao_invalido, - menor_intervalo < 5 AS indicador_intervalo_transacao_suspeito - FROM - integracao_agg -) -SELECT - *, - '{{ var("version") }}' as versao -FROM - indicadores -WHERE - indicador_fora_matriz = TRUE - OR indicador_tempo_integracao_invalido = TRUE - OR indicador_intervalo_transacao_suspeito = TRUE \ No newline at end of file +with + matriz as ( + select distinct + data_inicio_matriz, + data_fim_matriz, + array_to_string(sequencia_completa_modo, ', ') as modos, + to_json_string(sequencia_completa_rateio) as rateio, + tempo_integracao_minutos + from {{ ref("matriz_integracao") }} + ), + versao_matriz as (select distinct data_inicio_matriz, data_fim_matriz from matriz), + integracao_agg as ( + select + date(datetime_processamento_integracao) as data, + id_integracao, + string_agg( + case + when modo = 'Van' + then consorcio + when modo = 'Ônibus' + then 'SPPO' + else modo + end, + ', ' + order by sequencia_integracao + ) as modos, + to_json_string( + array_agg( + cast(percentual_rateio as numeric) order by sequencia_integracao + ) + ) as rateio, + min(datetime_transacao) as datetime_primeira_transacao, + max(datetime_transacao) as datetime_ultima_transacao, + min(intervalo_integracao) as menor_intervalo + from {{ ref("integracao") }} + {# from `rj-smtr.br_rj_riodejaneiro_bilhetagem.integracao` #} + {% if is_incremental() %} + where + {% if partition_list | length > 0 %} + data in ({{ partition_list | join(", ") }}) + {% else %} data = "2000-01-01" + {% endif %} + {% endif %} + group by 1, 2 + ), + integracao_matriz as ( + select + i.data, + i.id_integracao, + i.modos, + i.rateio, + i.datetime_primeira_transacao, + i.datetime_ultima_transacao, + i.menor_intervalo, + m.modos as modos_matriz, + m.rateio as rateio_matriz, + m.tempo_integracao_minutos, + v.data_inicio_matriz + from integracao_agg i + left join + matriz m + on i.data >= m.data_inicio_matriz + and (i.data <= m.data_fim_matriz or m.data_fim_matriz is null) + and i.modos = m.modos + left join + versao_matriz v + on i.data >= v.data_inicio_matriz + and (i.data <= v.data_fim_matriz or v.data_fim_matriz is null) + ), + indicadores as ( + select + data, + id_integracao, + modos, + modos_matriz is null as indicador_fora_matriz, + case + when modos_matriz is null + then null + else + timestamp_diff( + datetime_ultima_transacao, datetime_primeira_transacao, minute + ) + > tempo_integracao_minutos + end as indicador_tempo_integracao_invalido, + case + when modos_matriz is null then null else rateio != rateio_matriz + end as indicador_rateio_invalido, + rateio, + rateio_matriz, + data_inicio_matriz + from integracao_matriz + ) +select + *, + '{{ var("version") }}' as versao, + current_datetime("America/Sao_Paulo") as datetime_ultima_atualizacao +from indicadores +where + ( + indicador_fora_matriz + or indicador_tempo_integracao_invalido + or indicador_rateio_invalido + ) + and data >= (select min(data_inicio_matriz) from versao_matriz) diff --git a/queries/models/validacao_dados_jae/integracao_nao_realizada.sql b/queries/models/validacao_dados_jae/integracao_nao_realizada.sql index 94ee0ebeb..289dc312d 100644 --- a/queries/models/validacao_dados_jae/integracao_nao_realizada.sql +++ b/queries/models/validacao_dados_jae/integracao_nao_realizada.sql @@ -1,41 +1,42 @@ {{ - config( - materilized="incremental", - incremental_strategy="insert_overwrite", - partition_by={ - "field": "data", - "data_type": "date", - "granularity": "day" - }, - ) + config( + materilized="incremental", + incremental_strategy="insert_overwrite", + partition_by={"field": "data", "data_type": "date", "granularity": "day"}, + ) }} -{% set transacao_table = ref('transacao') %} +{% set transacao_table = ref("transacao") %} {% if execute %} - {% if is_incremental() %} + {% if is_incremental() %} - {% set partitions_query %} + {% set partitions_query %} SELECT CONCAT("'", PARSE_DATE("%Y%m%d", partition_id), "'") AS data FROM `{{ transacao_table.database }}.{{ transacao_table.schema }}.INFORMATION_SCHEMA.PARTITIONS` - -- `rj-smtr.{{ transacao_table.schema }}.INFORMATION_SCHEMA.PARTITIONS` + {# `rj-smtr.br_rj_riodejaneiro_bilhetagem.INFORMATION_SCHEMA.PARTITIONS` #} WHERE table_name = "{{ transacao_table.identifier }}" AND partition_id != "__NULL__" - AND DATE(last_modified_time, "America/Sao_Paulo") BETWEEN DATE_SUB(DATE("{{var('run_date')}}"), INTERVAL 1 DAY) AND DATE("{{var('run_date')}}") - {% endset %} + AND DATE(last_modified_time, "America/Sao_Paulo") BETWEEN DATE("{{var('date_range_start')}}") AND DATE("{{var('date_range_end')}}") + {% endset %} - {{ log("Running query: \n"~partitions_query, info=True) }} - {% set partitions = run_query(partitions_query) %} + {{ log("Running query: \n" ~ partitions_query, info=True) }} + {% set partitions = run_query(partitions_query) %} - {% set partition_list = partitions.columns[0].values() %} - {{ log("transacao partitions: \n"~partition_list, info=True) }} - {% endif %} + {% set partition_list = partitions.columns[0].values() %} + {{ log("transacao partitions: \n" ~ partition_list, info=True) }} + {% endif %} {% endif %} -{% set max_transactions = var('quantidade_integracoes_max') %} -- Número máximo de pernas em uma integração -{% set pivot_columns = ["datetime_transacao", "id_transacao", "modo", "servico_sentido"] %} +{% set max_transactions = var("quantidade_integracoes_max") %} -- Número máximo de pernas em uma integração +{% set pivot_columns = [ + "datetime_transacao", + "id_transacao", + "modo", + "servico_sentido", +] %} {% set transaction_date_filter %} {% if partition_list|length > 0 %} {% for p in partition_list %} @@ -55,198 +56,217 @@ {% endif %} {% endset %} -WITH matriz AS ( - SELECT - STRING_AGG(modo order by sequencia_integracao) AS sequencia_valida - FROM - {{ ref("matriz_integracao") }} - -- `rj-smtr.br_rj_riodejaneiro_bilhetagem.matriz_integracao` - group by id_matriz_integracao -), -transacao AS ( - SELECT - id_cliente, - {% for column in pivot_columns %} - {% if column == "servico_sentido" %} - CONCAT(id_servico_jae, '_', sentido) AS servico_sentido, - {% else %} - {{ column }}, - {% endif %} - {% endfor %} - FROM - {{ ref("transacao") }} - -- `rj-smtr.br_rj_riodejaneiro_bilhetagem.transacao` - WHERE - data < CURRENT_DATE("America/Sao_Paulo") - AND tipo_transacao != "Gratuidade" - {% if is_incremental() %} - AND - {{ transaction_date_filter }} - {% endif %} -), -transacao_agrupada AS ( - SELECT - id_cliente, - -- Cria o conjunto de colunas para a transação atual e as 4 próximas transações do cliente - {% for column in pivot_columns %} - {% for transaction_number in range(max_transactions) %} - {% if loop.first %} - {{ column }} AS {{ column }}_{{ transaction_number }}, - {% else %} - LEAD({{ column }}, {{ loop.index0 }}) OVER (PARTITION BY id_cliente ORDER BY datetime_transacao) AS {{ column }}_{{ transaction_number }}, - {% endif %} - {% endfor %} - {% endfor %} - FROM - transacao -), -integracao_possivel AS ( - SELECT - *, - {% set modos = ["modo_0"] %} - {% set servicos = ["servico_sentido_0"] %} - {% for transaction_number in range(1, max_transactions) %} - {% do modos.append("modo_"~transaction_number) %} - ( - DATETIME_DIFF(datetime_transacao_{{ transaction_number }}, datetime_transacao_0, MINUTE) <= 180 - AND CONCAT({{ modos|join(", ',', ") }}) IN (SELECT sequencia_valida FROM matriz) - {% if loop.first %} - AND servico_sentido_{{ transaction_number }} != servico_sentido_0 - {% else %} - AND servico_sentido_{{ transaction_number }} NOT IN ({{ servicos|join(", ',', ") }}) - {% endif %} - ) AS indicador_integracao_{{ transaction_number }}, - {% do servicos.append("servico_sentido_"~transaction_number) %} +with + matriz as ( + select + data_inicio_matriz, + data_fim_matriz, + array_agg(array_to_string(sequencia_completa_modo, ',')) as sequencia_valida + from {{ ref("matriz_integracao") }} + -- `rj-smtr.br_rj_riodejaneiro_bilhetagem.matriz_integracao` + group by data_inicio_matriz, data_fim_matriz + ), + transacao as ( + select + t.id_cliente, + {% for column in pivot_columns %} + {% if column == "servico_sentido" %} + concat(t.id_servico_jae, '_', t.sentido) as servico_sentido, + {% elif column == "modo" %} + case + when t.modo = 'Van' + then t.consorcio + when t.modo = 'Ônibus' + then 'SPPO' + else t.modo + end as modo, + {% else %} {{ column }}, + {% endif %} + {% endfor %} + m.data_inicio_matriz, + m.sequencia_valida + from {{ ref("transacao") }} t + {# from `rj-smtr.br_rj_riodejaneiro_bilhetagem.transacao` t #} + join + matriz m + on t.data >= m.data_inicio_matriz + and (t.data <= m.data_fim_matriz or m.data_fim_matriz is null) + where + t.data < current_date("America/Sao_Paulo") + and t.tipo_transacao != "Gratuidade" + and t.id_cliente is not null + and t.id_cliente != '733' + {% if is_incremental() %} and ({{ transaction_date_filter }}) {% endif %} + ), + transacao_agrupada as ( + select + id_cliente, + -- Cria o conjunto de colunas para a transação atual e as 4 próximas + -- transações do cliente + {% for column in pivot_columns %} + {% for transaction_number in range(max_transactions) %} + {% if loop.first %} + {{ column }} as {{ column }}_{{ transaction_number }}, + {% else %} + lead({{ column }}, {{ loop.index0 }}) over ( + partition by id_cliente order by datetime_transacao + ) as {{ column }}_{{ transaction_number }}, + {% endif %} + {% endfor %} + {% endfor %} + data_inicio_matriz, + sequencia_valida + from transacao + ), + integracao_possivel as ( + select + *, + {% set modos = ["modo_0"] %} + {% set servicos = ["servico_sentido_0"] %} + {% for transaction_number in range(1, max_transactions) %} + {% do modos.append("modo_" ~ transaction_number) %} + ( + datetime_diff( + datetime_transacao_{{ transaction_number }}, + datetime_transacao_0, + minute + ) + <= 180 + and concat({{ modos | join(", ',', ") }}) + in unnest(sequencia_valida) + {% if loop.first %} + and servico_sentido_{{ transaction_number }} + != servico_sentido_0 + {% else %} + and servico_sentido_{{ transaction_number }} + not in ({{ servicos | join(", ',', ") }}) + {% endif %} + ) as indicador_integracao_{{ transaction_number }}, + {% do servicos.append("servico_sentido_" ~ transaction_number) %} - {% endfor %} - FROM - transacao_agrupada - WHERE - id_transacao_1 IS NOT NULL -), -transacao_filtrada AS ( - SELECT - id_cliente, - {% for column in pivot_columns %} - {% for transaction_number in range(max_transactions) %} - {% if transaction_number < 2 %} - {{ column }}_{{ transaction_number }}, - {% else %} - CASE - WHEN - indicador_integracao_{{ transaction_number }} THEN {{ column }}_{{ transaction_number }} - END AS {{ column }}_{{ transaction_number }}, - {% endif %} - {% endfor %} - {% endfor %} - indicador_integracao_1, - indicador_integracao_2, - indicador_integracao_3, - indicador_integracao_4 - FROM - integracao_possivel - WHERE - indicador_integracao_1 -), -transacao_listada AS ( - SELECT - *, - ARRAY_TO_STRING( - [ - {% for i in range(1, max_transactions) %} - id_transacao_{{ i }} {% if not loop.last %},{% endif %} - {% endfor %} - ], - ", " - ) AS transacoes - FROM - transacao_filtrada -), -{% for i in range(max_transactions - 1) %} - validacao_integracao_{{ max_transactions - i }}_pernas AS ( - SELECT - ( - id_transacao_{{ max_transactions - i - 1 }} IS NOT NULL - {% if not loop.first %} - AND id_transacao_{{ max_transactions - i }} IS NULL - {% endif %} - AND id_transacao_0 IN UNNEST( - SPLIT( - STRING_AGG(transacoes, ", ") OVER (PARTITION BY id_cliente ORDER BY datetime_transacao_0 ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING), - ', ') - ) - ) AS remover_{{ max_transactions - i }}, - * - FROM - {% if loop.first %} - transacao_listada - {% else %} - validacao_integracao_{{ max_transactions - i + 1 }}_pernas - WHERE - NOT remover_{{ max_transactions - i + 1 }} - {% endif %} - ), -{% endfor %} -integracoes_validas AS ( - SELECT - DATE(datetime_transacao_0) AS data, - id_transacao_0 AS id_integracao, - * - FROM - validacao_integracao_2_pernas - WHERE - NOT remover_2 -), -melted AS ( - SELECT - data, - id_integracao, - sequencia_integracao, - datetime_transacao, - id_transacao, - modo, - SPLIT(servico_sentido, '_')[0] AS id_servico_jae, - SPLIT(servico_sentido, '_')[1] AS sentido, - COUNTIF(modo = "BRT") OVER(PARTITION BY id_integracao) > 1 AS indicador_transferencia - FROM - integracoes_validas, - UNNEST( - [ - {% for transaction_number in range(max_transactions) %} - STRUCT( + {% endfor %} + from transacao_agrupada + where id_transacao_1 is not null + ), + transacao_filtrada as ( + select + id_cliente, {% for column in pivot_columns %} - {{ column }}_{{ transaction_number }} AS {{ column }}, + {% for transaction_number in range(max_transactions) %} + {% if transaction_number < 2 %} + {{ column }}_{{ transaction_number }}, + {% else %} + case + when indicador_integracao_{{ transaction_number }} + then {{ column }}_{{ transaction_number }} + end as {{ column }}_{{ transaction_number }}, + {% endif %} + {% endfor %} {% endfor %} - {{ transaction_number + 1}} AS sequencia_integracao - ){%if not loop.last %},{% endif %} - {% endfor %} - ] - ) -), -integracao_nao_realizada AS ( - SELECT DISTINCT - id_integracao - FROM - melted - WHERE - NOT indicador_transferencia - AND id_transacao NOT IN ( - SELECT - id_transacao - FROM - {{ ref("integracao") }} - -- `rj-smtr.br_rj_riodejaneiro_bilhetagem.integracao` - {% if is_incremental() %} - WHERE - {{ transaction_date_filter }} - {% endif %} + indicador_integracao_1, + indicador_integracao_2, + indicador_integracao_3, + indicador_integracao_4, + data_inicio_matriz + from integracao_possivel + where indicador_integracao_1 + ), + transacao_listada as ( + select + *, + array_to_string( + [ + {% for i in range(1, max_transactions) %} + id_transacao_{{ i }} {% if not loop.last %},{% endif %} + {% endfor %} + ], + ", " + ) as transacoes + from transacao_filtrada + ), + {% for i in range(max_transactions - 1) %} + validacao_integracao_{{ max_transactions - i }}_pernas as ( + select + ( + id_transacao_{{ max_transactions - i - 1 }} is not null + {% if not loop.first %} + and id_transacao_{{ max_transactions - i }} is null + {% endif %} + and id_transacao_0 in unnest( + split( + string_agg(transacoes, ", ") over ( + partition by id_cliente + order by datetime_transacao_0 + rows between 5 preceding and 1 preceding + ), + ', ' + ) + ) + ) as remover_{{ max_transactions - i }}, + * + from {% if loop.first %} transacao_listada + {% else %} + validacao_integracao_{{ max_transactions - i + 1 }}_pernas + where not remover_{{ max_transactions - i + 1 }} + {% endif %} + ), + {% endfor %} + integracoes_validas as ( + select date(datetime_transacao_0) as data, id_transacao_0 as id_integracao, * + from validacao_integracao_2_pernas + where not remover_2 + ), + melted as ( + select + data, + id_integracao, + sequencia_integracao, + datetime_transacao, + id_transacao, + modo, + split(servico_sentido, '_')[0] as id_servico_jae, + split(servico_sentido, '_')[1] as sentido, + countif(modo = "BRT") over (partition by id_integracao) + > 1 as indicador_transferencia_brt, + countif(modo = "VLT") over (partition by id_integracao) + > 1 as indicador_transferencia_vlt, + data_inicio_matriz + from + integracoes_validas, + unnest( + [ + {% for transaction_number in range(max_transactions) %} + struct( + {% for column in pivot_columns %} + {{ column }}_{{ transaction_number }} as {{ column }}, + {% endfor %} + {{ transaction_number + 1 }} as sequencia_integracao + ) + {% if not loop.last %},{% endif %} + {% endfor %} + ] + ) + ), + integracao_nao_realizada as ( + select distinct id_integracao + from melted + where + not indicador_transferencia_brt + and not indicador_transferencia_vlt + and id_transacao not in ( + select id_transacao + from {{ ref("integracao") }} + -- `rj-smtr.br_rj_riodejaneiro_bilhetagem.integracao` + {% if is_incremental() %} + where {{ transaction_date_filter }} + {% endif %} + ) ) -) -SELECT - * EXCEPT(indicador_transferencia), - '{{ var("version") }}' as versao -FROM - melted -WHERE - id_integracao IN (SELECT id_integracao FROM integracao_nao_realizada) - AND id_transacao IS NOT NULL +select + * except (indicador_transferencia_brt, indicador_transferencia_vlt), + '{{ var("version") }}' as versao, + current_datetime("America/Sao_Paulo") as datetime_ultima_atualizacao +from melted +where + id_integracao in (select id_integracao from integracao_nao_realizada) + and id_transacao is not null diff --git a/queries/models/validacao_dados_jae/schema.yml b/queries/models/validacao_dados_jae/schema.yml index c448b3c55..6067e2bdb 100644 --- a/queries/models/validacao_dados_jae/schema.yml +++ b/queries/models/validacao_dados_jae/schema.yml @@ -6,18 +6,44 @@ models: columns: - name: data description: "Data de processamento da integração (partição)" + data_type: date - name: id_integracao description: "Identificador único da integração" + data_type: string - name: modos description: "Sequência modos das transações presentes na integração (separado por ', ')" + data_type: string - name: indicador_fora_matriz - description: "Indica se a sequência de modos não aparece na tabela br_rj_riodejaneiro_bilhetagem.matriz_integracao" + description: "Indica se a sequência de modos não aparece na tabela planejamento.matriz_integracao" + data_type: boolean - name: indicador_tempo_integracao_invalido - description: "Indica se o tempo entre a primeira e a última transação da integração é maior que 3 horas" + description: "Indica se o tempo entre a primeira e a última transação da integração é maior que o tempo limite de integração" + data_type: boolean - name: indicador_intervalo_transacao_suspeito description: "Indica se existe algum intervalo entre transações menor que 5 minutos dentro da integração" - name: versao description: "{{ doc('versao') }}" + data_type: string + - name: indicador_rateio_invalido + description: Indica se o rateio não foi feito de acordo com os valores estipulados na matriz + data_type: boolean + quote: true + - name: rateio + description: String no formato JSON com os valores de rateio praticados na integração + data_type: string + quote: true + - name: rateio_matriz + description: String no formato JSON com os valores de rateio estipulados pela matriz + data_type: string + quote: true + - name: data_inicio_matriz + description: "{{ doc('data_inicio_matriz') }}" + data_type: date + quote: true + - name: datetime_ultima_atualizacao + description: "{{ doc('datetime_ultima_atualizacao') }}" + data_type: datetime + quote: true - name: transacao_invalida description: "Tabela para validação dos dados de transação da Jaé" columns: @@ -97,19 +123,36 @@ models: columns: - name: data description: "Data da transação (partição)." + data_type: date - name: id_integracao description: "Identificador único da integração (é igual ao id_transacao da primeira transação da integração)." + data_type: string - name: sequencia_integracao description: "Sequência da transação dentro da integração" + data_type: int64 - name: datetime_transacao description: "Data e hora da transação em GMT-3" + data_type: datetime - name: id_transacao description: "Identificador único da transação" + data_type: string - name: modo - description: "Tipo de transporte (Ônibus, Van, BRT)" + description: "{{ doc('modo') }}" + data_type: string - name: id_servico_jae description: "Identificador da linha no banco de dados da jaé (É possível cruzar os dados com a tabela rj-smtr.cadastro.servicos usando a coluna id_servico_jae)" + data_type: string - name: sentido description: "Sentido de operação do serviço (0 = ida, 1 = volta)" + data_type: string - name: versao - description: "{{ doc('versao') }}" \ No newline at end of file + description: "{{ doc('versao') }}" + data_type: string + - name: data_inicio_matriz + description: "{{ doc('data_inicio_matriz') }}" + data_type: date + quote: true + - name: datetime_ultima_atualizacao + description: "{{ doc('datetime_ultima_atualizacao') }}" + data_type: datetime + quote: true diff --git a/queries/selectors.yml b/queries/selectors.yml index d6412f96f..27cd475f6 100644 --- a/queries/selectors.yml +++ b/queries/selectors.yml @@ -70,4 +70,19 @@ selectors: description: Materialização das views para o datario definition: method: fqn - value: datario \ No newline at end of file + value: datario + + - name: matriz_integracao_smtr + description: Materialização da matriz de integração da SMTR + definition: + method: fqn + value: matriz_integracao + + - name: validacao_dados_jae + description: Materialização das tabelas de validação dos dados da Jaé + definition: + union: + - method: fqn + value: integracao_invalida + - method: fqn + value: integracao_nao_realizada