From 786b1f4ebf32ba773300c8515559a4c614ef5875 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Wed, 2 Oct 2024 17:38:48 -0300 Subject: [PATCH 01/54] cria modelo autuacao_serpro --- queries/models/sources.yml | 1 + queries/models/transito/schema.yml | 72 +++++++++++++++++++ .../transito/staging/autuacao_serpro.sql | 45 ++++++++++++ 3 files changed, 118 insertions(+) create mode 100644 queries/models/transito/staging/autuacao_serpro.sql diff --git a/queries/models/sources.yml b/queries/models/sources.yml index b3be7abfb..f64f55184 100644 --- a/queries/models/sources.yml +++ b/queries/models/sources.yml @@ -164,4 +164,5 @@ sources: tables: - name: autuacoes_citran + - name: autuacoes_serpro - name: receita_autuacao \ No newline at end of file diff --git a/queries/models/transito/schema.yml b/queries/models/transito/schema.yml index c2b7f50e1..880ae3987 100644 --- a/queries/models/transito/schema.yml +++ b/queries/models/transito/schema.yml @@ -59,6 +59,78 @@ models: - name: status_sne description: "Indicador de adesão do veículo ao SNE" + - name: autuacao_serpro + description: "Tabela com dados de autuações fornecidos pelo sistema CITRAN/IplanRio" + columns: + - name: data + description: "Data da autuação (Coluna de particionamento)" + - name: id_auto_infracao + description: "Código do auto de infração" + - name: datetime_autuacao + description: "Data e hora da autuação" + - name: data_limite_defesa_previa + description: "Data limite para defesa prévia" + - name: data_limite_recurso + description: "Data limite para recurso em primeira instância" + - name: descricao_situacao_autuacao + description: "Descrição da situação da autuação" + - name: status_infracao + description: "Descrição do status da infração" + - name: codigo_enquadramento + description: "Código da autuação" + - name: tipificacao_resumida + description: "Descrição da autuação" + - name: pontuacao + description: "Quantidade de pontos da autuação" + - name: gravidade + description: "Descrição da gravidade da autuação" + - name: amparo_legal + description: "Amparo legal da autuação" + - name: tipo_veiculo + description: "Tipo de veículo autuado" + - name: descricao_veiculo + description: "Marca/modelo do veículo" + - name: placa_veiculo + description: "Placa do veículo" + - name: ano_fabricacao_veiculo + description: "Ano de fabricação do veículo" + - name: ano_modelo_veiculo + description: "Ano do modelo do veículo" + - name: cor_veiculo + description: "Cor do veículo" + - name: especie_veiculo + description: "Espécie do veículo" + - name: uf_infrator + description: "Estado do condutor infrator (em caso de indicação de real condutor infrator)" + - name: uf_principal_condutor + description: "Estado do condutor principal do veículo" + - name: uf_proprietario + description: "Estado do proprietário do veículo" + - name: cep_proprietario + description: "CEP do proprietário do veículo [protegido]" + - name: valor_infracao + description: "Valor monetário da autuação (100%) [R$]" + - name: valor_pago + description: "Valor pago da autuação [R$]" + - name: id_autuador + description: "Código do órgão autuador" + - name: descricao_autuador + description: "Descrição da unidade de autuação" + - name: id_municipio_autuacao + description: "Código TOM do município da autuação" + - name: descricao_municipio + description: "Nome do município da autuação" + - name: uf_autuacao + description: "Sigla do estado da autuação" + - name: processo_defesa_autuacao + description: "Número do processo de defesa prévia" + - name: recurso_penalidade_multa + description: "Número do processo de recurso contra aplicação de penalidade de multa em primeira instância" + - name: processo_troca_real_infrator + description: "Número do processo de troca de real condutor infrator" + - name: status_sne + description: "Indicador de adesão do veículo ao SNE" + - name: autuacao description: "Tabela com dados de autuações (até abril/2023)" columns: diff --git a/queries/models/transito/staging/autuacao_serpro.sql b/queries/models/transito/staging/autuacao_serpro.sql new file mode 100644 index 000000000..d9ddaceac --- /dev/null +++ b/queries/models/transito/staging/autuacao_serpro.sql @@ -0,0 +1,45 @@ + +{{ config( + materialized='view' + ) +}} + + +SELECT + DATE(data) AS data, + auinf_num_auto AS id_auto_infracao, + DATETIME(PARSE_TIMESTAMP('%Y-%m-%d %H:%M:%S', REGEXP_REPLACE(SAFE_CAST(JSON_VALUE(content, '$.auinf_dt_infracao') AS STRING), r'\.\d+', '')), 'America/Sao_Paulo') AS datetime_autuacao, + IF(JSON_VALUE(content, '$.auinf_dt_limite_defesa_previa') != '', SAFE_CAST(PARSE_DATE('%Y-%m-%d', JSON_VALUE(content,'$.auinf_dt_limite_defesa_previa')) AS STRING), NULL) AS data_limite_defesa_previa, + IF(JSON_VALUE(content, '$.auinf_dt_limite_recurso') != '', SAFE_CAST(PARSE_DATE('%Y-%m-%d', JSON_VALUE(content,'$.auinf_dt_limite_recurso')) AS STRING), NULL) AS data_limite_recurso, + SAFE_CAST(JSON_VALUE(content,'$.stat_dsc_status_ai') AS STRING) AS descricao_situacao_autuacao, + SAFE_CAST(JSON_VALUE(content,'$.stfu_dsc_status_fluxo_ai') AS STRING) AS status_infracao, + SAFE_CAST(JSON_VALUE(content,'$.htpi_cod_tipo_infracao') AS STRING) AS codigo_enquadramento, + SAFE_CAST(JSON_VALUE(content,'$.htpi_dsc_tipo_infracao') AS STRING) AS tipificacao_resumida, + SAFE_CAST(JSON_VALUE(content,'$.htpi_pontosdainfracao') AS STRING) AS pontuacao, + SAFE_CAST(JSON_VALUE(content,'$.hgrav_descricao') AS STRING) AS gravidade, + SAFE_CAST(JSON_VALUE(content,'$.htpi_amparo_legal') AS STRING) AS amparo_legal, + SAFE_CAST(JSON_VALUE(content,'$.auinf_vei_tipo') AS STRING) AS tipo_veiculo, + SAFE_CAST(JSON_VALUE(content,'$.auinf_veiculo_marca_modelo_informado') AS STRING) AS descricao_veiculo, + SAFE_CAST(JSON_VALUE(content,'$.auinf_veiculo_placa') AS STRING) AS placa_veiculo, + SAFE_CAST(JSON_VALUE(content,'$.auinf_veiculo_ano_fabricacao') AS STRING) AS ano_fabricacao_veiculo, + SAFE_CAST(JSON_VALUE(content,'$.auinf_veiculo_ano_modelo') AS STRING) AS ano_modelo_veiculo, + SAFE_CAST(JSON_VALUE(content,'$.auinf_veiculo_cor_desc') AS STRING) AS cor_veiculo, + SAFE_CAST(JSON_VALUE(content,'$.auinf_veiculo_especie_desc') AS STRING) AS especie_veiculo, + SAFE_CAST(JSON_VALUE(content,'$.uf_infrator') AS STRING) AS uf_infrator, + SAFE_CAST(JSON_VALUE(content,'$.uf_princ_cond') AS STRING) AS uf_principal_condutor, + SAFE_CAST(JSON_VALUE(content,'$.uf_prop_orig') AS STRING) AS uf_proprietario, + SAFE_CAST(JSON_VALUE(content,'$.cep_proprietario') AS STRING) AS cep_proprietario, + SAFE_CAST(JSON_VALUE(content,'$.auinf_infracao_valor') AS NUMERIC) AS valor_infracao, + SAFE_CAST(JSON_VALUE(content,'$.pag_valor') AS NUMERIC) AS valor_pago, + SAFE_CAST(JSON_VALUE(content,'$.auinf_id_orgao') AS STRING) AS id_autuador, + SAFE_CAST(JSON_VALUE(content,'$.unaut_dsc_unidade') AS STRING) AS descricao_autuador, + SAFE_CAST(JSON_VALUE(content,'$.id_municipio') AS STRING) AS id_municipio_autuacao, + SAFE_CAST(JSON_VALUE(content,'$.mun_nome') AS STRING) AS descricao_municipio, + SAFE_CAST(JSON_VALUE(content,'$.uf_sigla') AS STRING) AS uf_autuacao, + SAFE_CAST(JSON_VALUE(content,'$.defp_num_processo') AS STRING) AS processo_defesa_autuacao, + SAFE_CAST(JSON_VALUE(content,'$.canc_num_processo') AS STRING) AS recurso_penalidade_multa, + SAFE_CAST(JSON_VALUE(content,'$.ri_proc_nr') AS STRING) AS processo_troca_real_infrator, + SAFE_CAST(JSON_VALUE(content,'$.auinf_veiculo_adesao_sne_indicador') AS STRING) AS status_sne +FROM + {{ source('infracao_staging','autuacoes_serpro') }} + From 0aeb6da734ef5501756b5e5041ddae3159750014 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Wed, 2 Oct 2024 17:39:21 -0300 Subject: [PATCH 02/54] adiciona CTE serpro --- queries/models/transito/autuacao.sql | 92 +++++++++++++++++++++++++++- 1 file changed, 90 insertions(+), 2 deletions(-) diff --git a/queries/models/transito/autuacao.sql b/queries/models/transito/autuacao.sql index c78b19d52..e32363f76 100644 --- a/queries/models/transito/autuacao.sql +++ b/queries/models/transito/autuacao.sql @@ -54,8 +54,53 @@ WITH citran AS ( WHERE data BETWEEN DATE("{{var('date_range_start')}}") AND DATE("{{var('date_range_end')}}") {% endif %} +), +serpro AS ( + SELECT + data, + id_auto_infracao, + datetime_autuacao, + data_limite_defesa_previa, + data_limite_recurso, + descricao_situacao_autuacao, + IF(status_infracao != "", status_infracao, NULL) AS status_infracao, + IF(codigo_enquadramento != "", codigo_enquadramento, NULL) AS codigo_enquadramento, + IF(tipificacao_resumida != "", tipificacao_resumida, NULL) AS tipificacao_resumida, + IF(pontuacao != "", pontuacao, NULL) AS pontuacao, + NULL AS gravidade, + NULL AS amparo_legal, + IF(tipo_veiculo != "", tipo_veiculo, NULL) AS tipo_veiculo, + IF(descricao_veiculo != "", descricao_veiculo, NULL) AS descricao_veiculo, + NULL AS placa_veiculo, + NULL AS ano_fabricacao_veiculo, + NULL AS ano_modelo_veiculo, + NULL AS cor_veiculo, + IF(especie_veiculo != "", especie_veiculo, NULL) AS especie_veiculo, + NULL AS uf_infrator, + NULL AS uf_principal_condutor, + IF(uf_proprietario != "", uf_proprietario, NULL) AS uf_proprietario, + IF(cep_proprietario != "", cep_proprietario, NULL) AS cep_proprietario, + valor_infracao, + valor_pago, + SAFE_CAST(NULL AS STRING) AS data_pagamento, + COALESCE(id_autuador, "260010") AS id_autuador, + IF(descricao_autuador != "", descricao_autuador, NULL) AS descricao_autuador, + COALESCE(id_municipio_autuacao,"6001") AS id_municipio_autuacao, + COALESCE(descricao_municipio, "RIO DE JANEIRO") AS descricao_municipio, + COALESCE(uf_autuacao,"RJ") AS uf_autuacao, + NULL AS cep_autuacao, + NULL AS tile_autuacao, + processo_defesa_autuacao, + recurso_penalidade_multa, + processo_troca_real_infrator, + IF(status_sne = "1.0", TRUE, FALSE) AS status_sne, + "SERPRO" AS fonte + FROM {{ ref('autuacao_serpro') }} + {% if is_incremental() %} + WHERE + data BETWEEN DATE("{{var('date_range_start')}}") AND DATE("{{var('date_range_end')}}") + {% endif %} ) - SELECT data, TO_HEX(SHA256(CONCAT(GENERATE_UUID(), id_auto_infracao))) AS id_autuacao, @@ -97,4 +142,47 @@ SELECT status_sne, fonte FROM - citran \ No newline at end of file + citran +UNION ALL +SELECT + data, + TO_HEX(SHA256(CONCAT(GENERATE_UUID(), id_auto_infracao))) AS id_autuacao, + id_auto_infracao, + datetime_autuacao, + data_limite_defesa_previa, + data_limite_recurso, + descricao_situacao_autuacao, + status_infracao, + codigo_enquadramento, + tipificacao_resumida, + pontuacao, + gravidade, + amparo_legal, + tipo_veiculo, + descricao_veiculo, + placa_veiculo, + ano_fabricacao_veiculo, + ano_modelo_veiculo, + cor_veiculo, + especie_veiculo, + uf_infrator, + uf_principal_condutor, + uf_proprietario, + cep_proprietario, + valor_infracao, + valor_pago, + data_pagamento, + id_autuador, + descricao_autuador, + id_municipio_autuacao, + descricao_municipio, + uf_autuacao, + cep_autuacao, + tile_autuacao, + processo_defesa_autuacao, + recurso_penalidade_multa, + processo_troca_real_infrator, + status_sne, + fonte +FROM + serpro \ No newline at end of file From d2a1ee7f01f081dedef5de1a7dc5e710eee68900 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Wed, 2 Oct 2024 17:39:50 -0300 Subject: [PATCH 03/54] cria task get_timestamp_range --- pipelines/tasks.py | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/pipelines/tasks.py b/pipelines/tasks.py index c91fc7cd0..fa68f21ac 100644 --- a/pipelines/tasks.py +++ b/pipelines/tasks.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """Module containing general purpose tasks""" -from datetime import datetime -from typing import Any, Union +from datetime import datetime, timedelta +from typing import Any, List, Union import prefect from prefect import task @@ -271,3 +271,27 @@ def run_dbt_selector( dbt_logs = dbt_task.run() log("\n".join(dbt_logs)) + + +@task +def get_timestamp_range(start_date: str = None, end_date: str = None) -> List[str]: + """ + Generates a list of all days between two given dates (inclusive). + + Args: + start_date (str): The start date as a string in the format 'YYYY-MM-DD'. + end_date (str): The end date as a string in the format 'YYYY-MM-DD'. + """ + + start_date_dt = datetime.strptime(start_date, "%Y-%m-%d") + end_date_dt = datetime.strptime(end_date, "%Y-%m-%d") + + timestamps = [] + if start_date is None or end_date is None: + return None + + while start_date_dt <= end_date_dt: + timestamps.append(start_date_dt) + start_date_dt += timedelta(days=1) + + return timestamps From 594e3147b190e0a75c0d2908071cf0dad963d637 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Wed, 2 Oct 2024 17:40:44 -0300 Subject: [PATCH 04/54] cria flow serpro_captura --- pipelines/serpro/constants.py | 31 ++++++++++ pipelines/serpro/flows.py | 107 +++++++++++++++++++++++++++++++--- pipelines/serpro/tasks.py | 36 +++++++++++- 3 files changed, 165 insertions(+), 9 deletions(-) create mode 100644 pipelines/serpro/constants.py diff --git a/pipelines/serpro/constants.py b/pipelines/serpro/constants.py new file mode 100644 index 000000000..8a157a054 --- /dev/null +++ b/pipelines/serpro/constants.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +""" +Constant values for rj_smtr serpro +""" + +from enum import Enum + + +class constants(Enum): # pylint: disable=c0103 + """ + Constant values for rj_smtr serpro + """ + + INFRACAO_DATASET_ID = "infracao" + AUTUACAO_SERPRO_TABLE_ID = "autuacao_serpro" + AUTUACAO_MATERIALIZACAO_DATASET_ID = "transito" + AUTUACAO_MATERIALIZACAO_TABLE_ID = "autuacao" + + INFRACAO_PRIVATE_BUCKET = "rj-smtr-infracao-private" + + SERPRO_CAPTURE_PARAMS = { + "query": """ + SELECT + * + FROM + dbpro_radar_view_SMTR_VBL.tb_infracao_view + WHERE + TO_DATE(auinf_dt_infracao, 'YYYY-MM-DD') = '{date}' + """, + "primary_key": ["auinf_num_auto"], + } diff --git a/pipelines/serpro/flows.py b/pipelines/serpro/flows.py index 2e0585b29..196d51dcb 100644 --- a/pipelines/serpro/flows.py +++ b/pipelines/serpro/flows.py @@ -1,19 +1,112 @@ # -*- coding: utf-8 -*- +from prefect import Parameter, case from prefect.run_configs import KubernetesRun from prefect.storage import GCS +from prefect.tasks.control_flow import merge +from prefect.utilities.edges import unmapped from prefeitura_rio.pipelines_utils.custom import Flow +from prefeitura_rio.pipelines_utils.state_handlers import ( + handler_initialize_sentry, + handler_inject_bd_credentials, +) from pipelines.constants import constants as smtr_constants -from pipelines.serpro.tasks import wait_sleeping +from pipelines.migration.tasks import ( + create_date_hour_partition, + create_local_partition_path, + get_current_timestamp, + get_now_time, + parse_timestamp_to_string, + rename_current_flow_run_now_time, + run_dbt_model, + transform_raw_to_nested_structure, + unpack_mapped_results_nout2, + upload_raw_data_to_gcs, + upload_staging_data_to_gcs, +) +from pipelines.serpro.constants import constants +from pipelines.serpro.tasks import get_db_object, get_raw_serpro from pipelines.serpro.utils import handler_setup_serpro +from pipelines.tasks import get_timestamp_range + +with Flow("SMTR: SERPRO - Captura/Tratamento") as serpro_captura: + start_date = Parameter("start_date", default=None) + end_date = Parameter("end_date", default=None) + + rename_flow_run = rename_current_flow_run_now_time( + prefix=serpro_captura.name + " ", + now_time=get_now_time(), + ) + + capture_timestamps = get_timestamp_range(start_date, end_date) + with case(start_date, None): + current_timestamp = get_current_timestamp() + + timestamps = merge(current_timestamp, capture_timestamps) + + partitions = create_date_hour_partition.map( + timestamps, + partition_date_only=unmapped(True), + ) + + filenames = parse_timestamp_to_string.map(timestamps) + + local_filepaths = create_local_partition_path.map( + dataset_id=unmapped(constants.INFRACAO_DATASET_ID.value), + table_id=unmapped(constants.AUTUACAO_SERPRO_TABLE_ID.value), + partitions=partitions, + filename=filenames, + ) + + jdbc = get_db_object() + + raw_filepaths = get_raw_serpro.map( + jdbc=unmapped(jdbc), timestamp=timestamps, local_filepath=local_filepaths + ) + + transform_raw_to_nested_structure_results = transform_raw_to_nested_structure.map( + raw_filepath=raw_filepaths, + filepath=local_filepaths, + primary_key=constants.SERPRO_CAPTURE_PARAMS.value["primary_key"], + timestamp=timestamps, + error=unmapped(None), + ) + + errors, treated_filepaths = unpack_mapped_results_nout2( + mapped_results=transform_raw_to_nested_structure_results + ) + + errors = upload_raw_data_to_gcs.map( + dataset_id=unmapped(constants.INFRACAO_DATASET_ID.value), + table_id=unmapped(constants.AUTUACAO_SERPRO_TABLE_ID.value), + raw_filepath=raw_filepaths, + partitions=partitions, + error=unmapped(None), + ) + + wait_captura_true = upload_staging_data_to_gcs.map( + dataset_id=unmapped(constants.INFRACAO_DATASET_ID.value), + table_id=unmapped(constants.AUTUACAO_SERPRO_TABLE_ID.value), + staging_filepath=treated_filepaths, + partitions=partitions, + timestamp=timestamps, + error=errors, + ) -with Flow("SMTR - Teste Conexão Serpro") as flow: - # setup_serpro() - wait_sleeping() + wait_run_dbt_model = run_dbt_model( + dataset_id=constants.AUTUACAO_MATERIALIZACAO_DATASET_ID.value, + table_id=constants.AUTUACAO_MATERIALIZACAO_TABLE_ID.value, + _vars=[{"date_range_start": start_date, "date_range_end": end_date}], + upstream_tasks=[wait_captura_true], + ) -flow.storage = GCS(smtr_constants.GCS_FLOWS_BUCKET.value) -flow.run_config = KubernetesRun( +serpro_captura.storage = GCS(smtr_constants.GCS_FLOWS_BUCKET.value) +serpro_captura.run_config = KubernetesRun( image=smtr_constants.DOCKER_IMAGE_FEDORA.value, labels=[smtr_constants.RJ_SMTR_AGENT_LABEL.value], ) -flow.state_handlers = [handler_setup_serpro] +serpro_captura.state_handlers = [ + handler_setup_serpro, + handler_inject_bd_credentials, + handler_initialize_sentry, +] diff --git a/pipelines/serpro/tasks.py b/pipelines/serpro/tasks.py index aec6afe34..40aa30b44 100644 --- a/pipelines/serpro/tasks.py +++ b/pipelines/serpro/tasks.py @@ -1,16 +1,48 @@ # -*- coding: utf-8 -*- +import csv +import os +from datetime import datetime, timedelta from time import sleep from prefect import task +from pipelines.serpro.constants import constants from pipelines.utils.jdbc import JDBC +from pipelines.utils.secret import get_secret +from pipelines.utils.utils import log @task -def wait_sleeping(interval_seconds: int = 54000, wait=None): +def wait_sleeping(interval_seconds: int = 54000): sleep(interval_seconds) -@task +@task(checkpoint=False, max_retries=3, retry_delay=timedelta(seconds=20)) def get_db_object(secret_path="radar_serpro", environment: str = "dev"): + jar_path = get_secret(secret_path=secret_path, environment=environment)["jars"] + + if not os.path.exists(jar_path): + raise Exception(f"Arquivo JAR '{jar_path}' não encontrado.") + return JDBC(db_params_secret_path=secret_path, environment=environment) + + +@task(checkpoint=False, nout=2) +def get_raw_serpro(jdbc: JDBC, timestamp: str, local_filepath: str) -> str: + date = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S").date() + raw_filepath = local_filepath.format(mode="raw", filetype="csv") + + query = constants.SERPRO_CAPTURE_PARAMS.value["query"].format(date=date.strftime("%Y-%m-%d")) + + jdbc.execute_query(query) + columns = jdbc.get_columns() + + rows = jdbc.fetch_all() + + with open(raw_filepath, "w", newline="") as csvfile: + writer = csv.writer(csvfile) + writer.writerow(columns) + writer.writerows(rows) + + log(f"Raw data saved to: {raw_filepath}") + return raw_filepath From 923613fe001a88028153663c7458423b553ed72f Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Wed, 2 Oct 2024 18:08:17 -0300 Subject: [PATCH 05/54] corrige tipo do timestamp --- pipelines/serpro/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/serpro/tasks.py b/pipelines/serpro/tasks.py index 40aa30b44..12ba02285 100644 --- a/pipelines/serpro/tasks.py +++ b/pipelines/serpro/tasks.py @@ -28,8 +28,8 @@ def get_db_object(secret_path="radar_serpro", environment: str = "dev"): @task(checkpoint=False, nout=2) -def get_raw_serpro(jdbc: JDBC, timestamp: str, local_filepath: str) -> str: - date = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S").date() +def get_raw_serpro(jdbc: JDBC, timestamp: datetime, local_filepath: str) -> str: + date = timestamp.date() raw_filepath = local_filepath.format(mode="raw", filetype="csv") query = constants.SERPRO_CAPTURE_PARAMS.value["query"].format(date=date.strftime("%Y-%m-%d")) From c5d34f5f3cbd1af94b612676938d867ff052e4df Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Wed, 2 Oct 2024 18:57:02 -0300 Subject: [PATCH 06/54] altera query --- pipelines/serpro/constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/serpro/constants.py b/pipelines/serpro/constants.py index 8a157a054..04aed523c 100644 --- a/pipelines/serpro/constants.py +++ b/pipelines/serpro/constants.py @@ -25,7 +25,7 @@ class constants(Enum): # pylint: disable=c0103 FROM dbpro_radar_view_SMTR_VBL.tb_infracao_view WHERE - TO_DATE(auinf_dt_infracao, 'YYYY-MM-DD') = '{date}' + CAST(auinf_dt_infracao AS DATE) = CAST('{date}' AS DATE) """, "primary_key": ["auinf_num_auto"], } From 32d33b54c0b7ac5ba52da06e1099136728b7289c Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Wed, 2 Oct 2024 19:13:55 -0300 Subject: [PATCH 07/54] altera where --- pipelines/serpro/constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/serpro/constants.py b/pipelines/serpro/constants.py index 04aed523c..3afa31a02 100644 --- a/pipelines/serpro/constants.py +++ b/pipelines/serpro/constants.py @@ -25,7 +25,7 @@ class constants(Enum): # pylint: disable=c0103 FROM dbpro_radar_view_SMTR_VBL.tb_infracao_view WHERE - CAST(auinf_dt_infracao AS DATE) = CAST('{date}' AS DATE) + auinf_dt_infracao= '{date}' """, "primary_key": ["auinf_num_auto"], } From 3d03c01d53f4872b2bf30a9274c8d7c51a51771b Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Wed, 2 Oct 2024 19:31:34 -0300 Subject: [PATCH 08/54] add mkdir --- pipelines/serpro/tasks.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pipelines/serpro/tasks.py b/pipelines/serpro/tasks.py index 12ba02285..504750716 100644 --- a/pipelines/serpro/tasks.py +++ b/pipelines/serpro/tasks.py @@ -2,6 +2,7 @@ import csv import os from datetime import datetime, timedelta +from pathlib import Path from time import sleep from prefect import task @@ -31,6 +32,7 @@ def get_db_object(secret_path="radar_serpro", environment: str = "dev"): def get_raw_serpro(jdbc: JDBC, timestamp: datetime, local_filepath: str) -> str: date = timestamp.date() raw_filepath = local_filepath.format(mode="raw", filetype="csv") + Path(raw_filepath).parent.mkdir(parents=True, exist_ok=True) query = constants.SERPRO_CAPTURE_PARAMS.value["query"].format(date=date.strftime("%Y-%m-%d")) From 2e52ca301295d16c2f7d6ea626c965df37d3e4b6 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Wed, 2 Oct 2024 19:47:25 -0300 Subject: [PATCH 09/54] add bucket_name --- pipelines/serpro/flows.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pipelines/serpro/flows.py b/pipelines/serpro/flows.py index 196d51dcb..d6abd1e9c 100644 --- a/pipelines/serpro/flows.py +++ b/pipelines/serpro/flows.py @@ -82,6 +82,7 @@ raw_filepath=raw_filepaths, partitions=partitions, error=unmapped(None), + bucket_name=unmapped(constants.INFRACAO_PRIVATE_BUCKET.value), ) wait_captura_true = upload_staging_data_to_gcs.map( @@ -91,6 +92,7 @@ partitions=partitions, timestamp=timestamps, error=errors, + bucket_name=unmapped(constants.INFRACAO_PRIVATE_BUCKET.value), ) wait_run_dbt_model = run_dbt_model( From 989e6d93803e8ae7fe8c978b5ebb426523ed3749 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 3 Oct 2024 12:07:27 -0300 Subject: [PATCH 10/54] corrige where --- pipelines/serpro/constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/serpro/constants.py b/pipelines/serpro/constants.py index 3afa31a02..cf5040b29 100644 --- a/pipelines/serpro/constants.py +++ b/pipelines/serpro/constants.py @@ -25,7 +25,7 @@ class constants(Enum): # pylint: disable=c0103 FROM dbpro_radar_view_SMTR_VBL.tb_infracao_view WHERE - auinf_dt_infracao= '{date}' + SUBSTRING(auinf_dt_infracao, 1, 10) = '{date}' """, "primary_key": ["auinf_num_auto"], } From c486e639f9752df4dc5c4175020c80ea409bbc05 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 3 Oct 2024 12:10:09 -0300 Subject: [PATCH 11/54] add dbt deps --- Dockerfile-fedora | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Dockerfile-fedora b/Dockerfile-fedora index 49ebdcc87..ca3a9ac08 100644 --- a/Dockerfile-fedora +++ b/Dockerfile-fedora @@ -23,3 +23,5 @@ WORKDIR /app COPY . . RUN python3.10 -m pip install --prefer-binary --no-cache-dir -U . RUN python3 -m pip install jaydebeapi +WORKDIR /app/queries +RUN dbt deps \ No newline at end of file From 807f7bb8e67095511a3cf1b35bba63528bbb8150 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 3 Oct 2024 12:33:22 -0300 Subject: [PATCH 12/54] aumenta max_retries e retry_delay da get_db_object --- pipelines/serpro/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/serpro/tasks.py b/pipelines/serpro/tasks.py index 504750716..a971111cb 100644 --- a/pipelines/serpro/tasks.py +++ b/pipelines/serpro/tasks.py @@ -18,7 +18,7 @@ def wait_sleeping(interval_seconds: int = 54000): sleep(interval_seconds) -@task(checkpoint=False, max_retries=3, retry_delay=timedelta(seconds=20)) +@task(checkpoint=False, max_retries=5, retry_delay=timedelta(seconds=30)) def get_db_object(secret_path="radar_serpro", environment: str = "dev"): jar_path = get_secret(secret_path=secret_path, environment=environment)["jars"] From 88acb224582683c78add3e0cb673f564c089ed43 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 3 Oct 2024 12:53:23 -0300 Subject: [PATCH 13/54] aumenta max_retries e retry_delay da get_db_object --- pipelines/serpro/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/serpro/tasks.py b/pipelines/serpro/tasks.py index a971111cb..e10354390 100644 --- a/pipelines/serpro/tasks.py +++ b/pipelines/serpro/tasks.py @@ -18,7 +18,7 @@ def wait_sleeping(interval_seconds: int = 54000): sleep(interval_seconds) -@task(checkpoint=False, max_retries=5, retry_delay=timedelta(seconds=30)) +@task(checkpoint=False, max_retries=10, retry_delay=timedelta(seconds=60)) def get_db_object(secret_path="radar_serpro", environment: str = "dev"): jar_path = get_secret(secret_path=secret_path, environment=environment)["jars"] From 76e6c205d76793a4f7a50e994337a46d0f7d0e42 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 3 Oct 2024 13:23:19 -0300 Subject: [PATCH 14/54] aumenta retry_delay da get_db_object --- pipelines/serpro/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/serpro/tasks.py b/pipelines/serpro/tasks.py index e10354390..a5f961b5d 100644 --- a/pipelines/serpro/tasks.py +++ b/pipelines/serpro/tasks.py @@ -18,7 +18,7 @@ def wait_sleeping(interval_seconds: int = 54000): sleep(interval_seconds) -@task(checkpoint=False, max_retries=10, retry_delay=timedelta(seconds=60)) +@task(checkpoint=False, max_retries=10, retry_delay=timedelta(seconds=300)) def get_db_object(secret_path="radar_serpro", environment: str = "dev"): jar_path = get_secret(secret_path=secret_path, environment=environment)["jars"] From 3ced5d289ecf9f91fbc542e715e4a00bf988fd27 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 3 Oct 2024 14:26:31 -0300 Subject: [PATCH 15/54] add path to setup_serpro --- pipelines/serpro/utils.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pipelines/serpro/utils.py b/pipelines/serpro/utils.py index d0ec87da0..bdc5c2b7d 100644 --- a/pipelines/serpro/utils.py +++ b/pipelines/serpro/utils.py @@ -10,10 +10,12 @@ def setup_serpro(secret_path: str = "radar_serpro"): data = get_secret(secret_path=secret_path)["setup.sh"] log("Got Secret") - os.popen("touch setup.sh") - with open("setup.sh", "w") as f: + + path = "/app/setup.sh" + os.popen(f"touch {path}") + with open(path, "w") as f: f.write(data) - return os.popen("sh setup.sh") + return os.popen(f"sh {path}") def handler_setup_serpro(obj, old_state: State, new_state: State) -> State: From d244193e6c5447be99b33d88e69c18a3e6f23e59 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 3 Oct 2024 14:40:34 -0300 Subject: [PATCH 16/54] add workdir --- Dockerfile-fedora | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Dockerfile-fedora b/Dockerfile-fedora index ca3a9ac08..8f46cc584 100644 --- a/Dockerfile-fedora +++ b/Dockerfile-fedora @@ -24,4 +24,5 @@ COPY . . RUN python3.10 -m pip install --prefer-binary --no-cache-dir -U . RUN python3 -m pip install jaydebeapi WORKDIR /app/queries -RUN dbt deps \ No newline at end of file +RUN dbt deps +WORKDIR /app \ No newline at end of file From 84685dde84ed89014a867572caccdb52860acbcc Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 3 Oct 2024 14:41:43 -0300 Subject: [PATCH 17/54] volta setup_serpro --- pipelines/serpro/utils.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/pipelines/serpro/utils.py b/pipelines/serpro/utils.py index bdc5c2b7d..d0ec87da0 100644 --- a/pipelines/serpro/utils.py +++ b/pipelines/serpro/utils.py @@ -10,12 +10,10 @@ def setup_serpro(secret_path: str = "radar_serpro"): data = get_secret(secret_path=secret_path)["setup.sh"] log("Got Secret") - - path = "/app/setup.sh" - os.popen(f"touch {path}") - with open(path, "w") as f: + os.popen("touch setup.sh") + with open("setup.sh", "w") as f: f.write(data) - return os.popen(f"sh {path}") + return os.popen("sh setup.sh") def handler_setup_serpro(obj, old_state: State, new_state: State) -> State: From 4317bd595f0a80c4b24e61f41a54d729814aaf8c Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 3 Oct 2024 15:19:08 -0300 Subject: [PATCH 18/54] retorna max_retries e retry_delay da get_db_object --- pipelines/serpro/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/serpro/tasks.py b/pipelines/serpro/tasks.py index a5f961b5d..504750716 100644 --- a/pipelines/serpro/tasks.py +++ b/pipelines/serpro/tasks.py @@ -18,7 +18,7 @@ def wait_sleeping(interval_seconds: int = 54000): sleep(interval_seconds) -@task(checkpoint=False, max_retries=10, retry_delay=timedelta(seconds=300)) +@task(checkpoint=False, max_retries=3, retry_delay=timedelta(seconds=20)) def get_db_object(secret_path="radar_serpro", environment: str = "dev"): jar_path = get_secret(secret_path=secret_path, environment=environment)["jars"] From 59acf79594ccc71292312c34b947f1c62404e885 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 3 Oct 2024 15:54:46 -0300 Subject: [PATCH 19/54] import subprocess --- pipelines/serpro/utils.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/pipelines/serpro/utils.py b/pipelines/serpro/utils.py index d0ec87da0..1072df646 100644 --- a/pipelines/serpro/utils.py +++ b/pipelines/serpro/utils.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- -import os +# import os +import subprocess from prefect.engine.state import State @@ -10,10 +11,18 @@ def setup_serpro(secret_path: str = "radar_serpro"): data = get_secret(secret_path=secret_path)["setup.sh"] log("Got Secret") - os.popen("touch setup.sh") + # os.popen("touch setup.sh") + subprocess.run(["touch", "setup.sh"]) with open("setup.sh", "w") as f: f.write(data) - return os.popen("sh setup.sh") + + result = subprocess.run(["sh", "setup.sh"]) + + if result.returncode == 0: + log("setup.sh executou corretamente") + + return result + # return os.popen("sh setup.sh") def handler_setup_serpro(obj, old_state: State, new_state: State) -> State: From 0cec8e02f1ea1b2db0b40f22b6bc8fa7945a5f7d Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 3 Oct 2024 16:19:00 -0300 Subject: [PATCH 20/54] add pre_treatment_reader_args --- pipelines/serpro/constants.py | 1 + pipelines/serpro/flows.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pipelines/serpro/constants.py b/pipelines/serpro/constants.py index cf5040b29..6341eca4f 100644 --- a/pipelines/serpro/constants.py +++ b/pipelines/serpro/constants.py @@ -28,4 +28,5 @@ class constants(Enum): # pylint: disable=c0103 SUBSTRING(auinf_dt_infracao, 1, 10) = '{date}' """, "primary_key": ["auinf_num_auto"], + "pre_treatment_reader_args": {"dtype": "object"}, } diff --git a/pipelines/serpro/flows.py b/pipelines/serpro/flows.py index d6abd1e9c..ac4897785 100644 --- a/pipelines/serpro/flows.py +++ b/pipelines/serpro/flows.py @@ -67,8 +67,9 @@ transform_raw_to_nested_structure_results = transform_raw_to_nested_structure.map( raw_filepath=raw_filepaths, filepath=local_filepaths, - primary_key=constants.SERPRO_CAPTURE_PARAMS.value["primary_key"], + primary_key=unmapped(constants.SERPRO_CAPTURE_PARAMS.value["primary_key"]), timestamp=timestamps, + reader_args=unmapped(constants.SERPRO_CAPTURE_PARAMS.value["pre_treatment_reader_args"]), error=unmapped(None), ) From fce39c3dea243f48a9a5419c940fd8b4a9ab2002 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 3 Oct 2024 16:42:32 -0300 Subject: [PATCH 21/54] comenta bucket_name --- pipelines/serpro/flows.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/serpro/flows.py b/pipelines/serpro/flows.py index ac4897785..c1980f2c2 100644 --- a/pipelines/serpro/flows.py +++ b/pipelines/serpro/flows.py @@ -83,7 +83,7 @@ raw_filepath=raw_filepaths, partitions=partitions, error=unmapped(None), - bucket_name=unmapped(constants.INFRACAO_PRIVATE_BUCKET.value), + # bucket_name=unmapped(constants.INFRACAO_PRIVATE_BUCKET.value), ) wait_captura_true = upload_staging_data_to_gcs.map( @@ -93,7 +93,7 @@ partitions=partitions, timestamp=timestamps, error=errors, - bucket_name=unmapped(constants.INFRACAO_PRIVATE_BUCKET.value), + # bucket_name=unmapped(constants.INFRACAO_PRIVATE_BUCKET.value), ) wait_run_dbt_model = run_dbt_model( From b4ce077297859ac5388274dc07d5ab529162aa5c Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 3 Oct 2024 17:01:55 -0300 Subject: [PATCH 22/54] add exception --- pipelines/serpro/utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pipelines/serpro/utils.py b/pipelines/serpro/utils.py index 1072df646..6c58845e7 100644 --- a/pipelines/serpro/utils.py +++ b/pipelines/serpro/utils.py @@ -10,8 +10,7 @@ def setup_serpro(secret_path: str = "radar_serpro"): data = get_secret(secret_path=secret_path)["setup.sh"] - log("Got Secret") - # os.popen("touch setup.sh") + subprocess.run(["touch", "setup.sh"]) with open("setup.sh", "w") as f: f.write(data) @@ -20,9 +19,10 @@ def setup_serpro(secret_path: str = "radar_serpro"): if result.returncode == 0: log("setup.sh executou corretamente") + else: + raise Exception(f"Error executing setup.sh: {result.stderr}") return result - # return os.popen("sh setup.sh") def handler_setup_serpro(obj, old_state: State, new_state: State) -> State: From 16c3bedbddd7bd34c161e2872eeb0f8796234019 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 3 Oct 2024 17:03:12 -0300 Subject: [PATCH 23/54] altera get_db_object --- pipelines/serpro/tasks.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/pipelines/serpro/tasks.py b/pipelines/serpro/tasks.py index 504750716..29fc1b1cf 100644 --- a/pipelines/serpro/tasks.py +++ b/pipelines/serpro/tasks.py @@ -1,7 +1,6 @@ # -*- coding: utf-8 -*- import csv -import os -from datetime import datetime, timedelta +from datetime import datetime from pathlib import Path from time import sleep @@ -9,7 +8,6 @@ from pipelines.serpro.constants import constants from pipelines.utils.jdbc import JDBC -from pipelines.utils.secret import get_secret from pipelines.utils.utils import log @@ -18,13 +16,8 @@ def wait_sleeping(interval_seconds: int = 54000): sleep(interval_seconds) -@task(checkpoint=False, max_retries=3, retry_delay=timedelta(seconds=20)) +@task(checkpoint=False) def get_db_object(secret_path="radar_serpro", environment: str = "dev"): - jar_path = get_secret(secret_path=secret_path, environment=environment)["jars"] - - if not os.path.exists(jar_path): - raise Exception(f"Arquivo JAR '{jar_path}' não encontrado.") - return JDBC(db_params_secret_path=secret_path, environment=environment) From 4b48791de04791464aeff298a51b72ed863f7d63 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 3 Oct 2024 17:33:59 -0300 Subject: [PATCH 24/54] teste dev --- queries/profiles.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queries/profiles.yml b/queries/profiles.yml index e67088d2d..054a713ef 100644 --- a/queries/profiles.yml +++ b/queries/profiles.yml @@ -78,4 +78,4 @@ queries: spark.executor.instances: "2" spark.driver.memory: 4g spark.driver.memoryOverhead: 1g - target: prod \ No newline at end of file + target: dev \ No newline at end of file From 7a97bc75511517f81ecaf03c572ab3b90e3dbefd Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 3 Oct 2024 17:54:22 -0300 Subject: [PATCH 25/54] hmg --- queries/profiles.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queries/profiles.yml b/queries/profiles.yml index 054a713ef..3ac185e4d 100644 --- a/queries/profiles.yml +++ b/queries/profiles.yml @@ -78,4 +78,4 @@ queries: spark.executor.instances: "2" spark.driver.memory: 4g spark.driver.memoryOverhead: 1g - target: dev \ No newline at end of file + target: hmg \ No newline at end of file From e69b9a5b8daea49aad1eb5488ad482c1aa6409ac Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 3 Oct 2024 18:14:57 -0300 Subject: [PATCH 26/54] altera modelos --- queries/models/transito/autuacao.sql | 16 ++++++++-------- .../models/transito/staging/autuacao_serpro.sql | 1 - 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/queries/models/transito/autuacao.sql b/queries/models/transito/autuacao.sql index e32363f76..23789c34f 100644 --- a/queries/models/transito/autuacao.sql +++ b/queries/models/transito/autuacao.sql @@ -67,17 +67,17 @@ serpro AS ( IF(codigo_enquadramento != "", codigo_enquadramento, NULL) AS codigo_enquadramento, IF(tipificacao_resumida != "", tipificacao_resumida, NULL) AS tipificacao_resumida, IF(pontuacao != "", pontuacao, NULL) AS pontuacao, - NULL AS gravidade, - NULL AS amparo_legal, + gravidade, + amparo_legal, IF(tipo_veiculo != "", tipo_veiculo, NULL) AS tipo_veiculo, IF(descricao_veiculo != "", descricao_veiculo, NULL) AS descricao_veiculo, - NULL AS placa_veiculo, - NULL AS ano_fabricacao_veiculo, - NULL AS ano_modelo_veiculo, - NULL AS cor_veiculo, + placa_veiculo, + ano_fabricacao_veiculo, + ano_modelo_veiculo, + cor_veiculo, IF(especie_veiculo != "", especie_veiculo, NULL) AS especie_veiculo, - NULL AS uf_infrator, - NULL AS uf_principal_condutor, + uf_infrator, + uf_principal_condutor, IF(uf_proprietario != "", uf_proprietario, NULL) AS uf_proprietario, IF(cep_proprietario != "", cep_proprietario, NULL) AS cep_proprietario, valor_infracao, diff --git a/queries/models/transito/staging/autuacao_serpro.sql b/queries/models/transito/staging/autuacao_serpro.sql index d9ddaceac..44049788f 100644 --- a/queries/models/transito/staging/autuacao_serpro.sql +++ b/queries/models/transito/staging/autuacao_serpro.sql @@ -28,7 +28,6 @@ SELECT SAFE_CAST(JSON_VALUE(content,'$.uf_infrator') AS STRING) AS uf_infrator, SAFE_CAST(JSON_VALUE(content,'$.uf_princ_cond') AS STRING) AS uf_principal_condutor, SAFE_CAST(JSON_VALUE(content,'$.uf_prop_orig') AS STRING) AS uf_proprietario, - SAFE_CAST(JSON_VALUE(content,'$.cep_proprietario') AS STRING) AS cep_proprietario, SAFE_CAST(JSON_VALUE(content,'$.auinf_infracao_valor') AS NUMERIC) AS valor_infracao, SAFE_CAST(JSON_VALUE(content,'$.pag_valor') AS NUMERIC) AS valor_pago, SAFE_CAST(JSON_VALUE(content,'$.auinf_id_orgao') AS STRING) AS id_autuador, From f1560f9eeba893d5b8fafb2b2c38688ff54b7cf2 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 3 Oct 2024 18:19:03 -0300 Subject: [PATCH 27/54] upstream --- pipelines/serpro/flows.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pipelines/serpro/flows.py b/pipelines/serpro/flows.py index c1980f2c2..6a91a77cb 100644 --- a/pipelines/serpro/flows.py +++ b/pipelines/serpro/flows.py @@ -100,6 +100,7 @@ dataset_id=constants.AUTUACAO_MATERIALIZACAO_DATASET_ID.value, table_id=constants.AUTUACAO_MATERIALIZACAO_TABLE_ID.value, _vars=[{"date_range_start": start_date, "date_range_end": end_date}], + upstream=True, upstream_tasks=[wait_captura_true], ) From 47c016fbd2d0b9154ff8bedf60990b00f751f869 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 3 Oct 2024 18:45:37 -0300 Subject: [PATCH 28/54] corrige CTE serpro.cep_proprietario --- queries/models/transito/autuacao.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queries/models/transito/autuacao.sql b/queries/models/transito/autuacao.sql index 23789c34f..74f10071c 100644 --- a/queries/models/transito/autuacao.sql +++ b/queries/models/transito/autuacao.sql @@ -79,7 +79,7 @@ serpro AS ( uf_infrator, uf_principal_condutor, IF(uf_proprietario != "", uf_proprietario, NULL) AS uf_proprietario, - IF(cep_proprietario != "", cep_proprietario, NULL) AS cep_proprietario, + NULL AS cep_proprietario, valor_infracao, valor_pago, SAFE_CAST(NULL AS STRING) AS data_pagamento, From 9307a509374a8df0540bee0f1d84986d88f3a50c Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 3 Oct 2024 18:54:01 -0300 Subject: [PATCH 29/54] add SAFE_CAST --- queries/models/transito/autuacao.sql | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/queries/models/transito/autuacao.sql b/queries/models/transito/autuacao.sql index 74f10071c..9bb7068c1 100644 --- a/queries/models/transito/autuacao.sql +++ b/queries/models/transito/autuacao.sql @@ -21,17 +21,17 @@ WITH citran AS ( IF(codigo_enquadramento != "", codigo_enquadramento, NULL) AS codigo_enquadramento, IF(tipificacao_resumida != "", tipificacao_resumida, NULL) AS tipificacao_resumida, IF(pontuacao != "", pontuacao, NULL) AS pontuacao, - NULL AS gravidade, - NULL AS amparo_legal, + SAFE_CAST(NULL AS STRING) AS gravidade, + SAFE_CAST(NULL AS STRING) AS amparo_legal, IF(tipo_veiculo != "", tipo_veiculo, NULL) AS tipo_veiculo, IF(descricao_veiculo != "", descricao_veiculo, NULL) AS descricao_veiculo, - NULL AS placa_veiculo, - NULL AS ano_fabricacao_veiculo, - NULL AS ano_modelo_veiculo, - NULL AS cor_veiculo, + SAFE_CAST(NULL AS STRING) AS placa_veiculo, + SAFE_CAST(NULL AS STRING) AS ano_fabricacao_veiculo, + SAFE_CAST(NULL AS STRING) AS ano_modelo_veiculo, + SAFE_CAST(NULL AS STRING) AS cor_veiculo, IF(especie_veiculo != "", especie_veiculo, NULL) AS especie_veiculo, - NULL AS uf_infrator, - NULL AS uf_principal_condutor, + SAFE_CAST(NULL AS STRING) AS uf_infrator, + SAFE_CAST(NULL AS STRING) AS uf_principal_condutor, IF(uf_proprietario != "", uf_proprietario, NULL) AS uf_proprietario, IF(cep_proprietario != "", cep_proprietario, NULL) AS cep_proprietario, valor_infracao / 100 AS valor_infracao, @@ -79,7 +79,7 @@ serpro AS ( uf_infrator, uf_principal_condutor, IF(uf_proprietario != "", uf_proprietario, NULL) AS uf_proprietario, - NULL AS cep_proprietario, + SAFE_CAST(NULL AS STRING) AS cep_proprietario, valor_infracao, valor_pago, SAFE_CAST(NULL AS STRING) AS data_pagamento, From 1aa705967ec3548a972c8f8cf6185a515a63c3a0 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Fri, 4 Oct 2024 17:56:58 -0300 Subject: [PATCH 30/54] padroniza dados --- queries/models/transito/autuacao.sql | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/queries/models/transito/autuacao.sql b/queries/models/transito/autuacao.sql index 9bb7068c1..12a8de41f 100644 --- a/queries/models/transito/autuacao.sql +++ b/queries/models/transito/autuacao.sql @@ -18,18 +18,29 @@ WITH citran AS ( data_limite_recurso, situacao_atual AS descricao_situacao_autuacao, IF(status_infracao != "", status_infracao, NULL) AS status_infracao, - IF(codigo_enquadramento != "", codigo_enquadramento, NULL) AS codigo_enquadramento, + SUBSTR(REPLACE(codigo_enquadramento, '-', ''), 1, 4) AS codigo_enquadramento, IF(tipificacao_resumida != "", tipificacao_resumida, NULL) AS tipificacao_resumida, - IF(pontuacao != "", pontuacao, NULL) AS pontuacao, - SAFE_CAST(NULL AS STRING) AS gravidade, + SAFE_CAST(SUBSTR(REGEXP_EXTRACT(pontuacao, r'\d+'), 2) AS STRING) AS pontuacao, + CASE + WHEN INITCAP(REGEXP_REPLACE(pontuacao, r'\d+', '')) = 'Media' THEN 'Média' + WHEN INITCAP(REGEXP_REPLACE(pontuacao, r'\d+', '')) = 'Gravissima' THEN 'Gravíssima' + ELSE INITCAP(REGEXP_REPLACE(pontuacao, r'\d+', '')) + END AS gravidade, SAFE_CAST(NULL AS STRING) AS amparo_legal, - IF(tipo_veiculo != "", tipo_veiculo, NULL) AS tipo_veiculo, + INITCAP(tipo_veiculo) AS tipo_veiculo, IF(descricao_veiculo != "", descricao_veiculo, NULL) AS descricao_veiculo, SAFE_CAST(NULL AS STRING) AS placa_veiculo, SAFE_CAST(NULL AS STRING) AS ano_fabricacao_veiculo, SAFE_CAST(NULL AS STRING) AS ano_modelo_veiculo, SAFE_CAST(NULL AS STRING) AS cor_veiculo, - IF(especie_veiculo != "", especie_veiculo, NULL) AS especie_veiculo, + CASE + WHEN INITCAP(REGEXP_REPLACE(especie_veiculo, r'\d+', '')) IN ('Misto', '0Misto') THEN 'Misto' + WHEN INITCAP(REGEXP_REPLACE(especie_veiculo, r'\d+', '')) IN ('Passageir', '0Passageir', 'Passageiro', '0Passageiro') THEN 'Passageiro' + WHEN INITCAP(REGEXP_REPLACE(especie_veiculo, r'\d+', '')) IN ('Tracao', '0Tracao', 'Tracao') THEN 'Tração' + WHEN INITCAP(REGEXP_REPLACE(especie_veiculo, r'\d+', '')) IN ('Nao Inform', '0Nao Inform', 'Nao Informado', '0Nao Informado') THEN 'Não informado' + WHEN INITCAP(REGEXP_REPLACE(especie_veiculo, r'\d+', '')) IN ('Carga', '0Carga') THEN 'Carga' + ELSE 'Inválido' + END AS especie_veiculo, SAFE_CAST(NULL AS STRING) AS uf_infrator, SAFE_CAST(NULL AS STRING) AS uf_principal_condutor, IF(uf_proprietario != "", uf_proprietario, NULL) AS uf_proprietario, @@ -66,16 +77,16 @@ serpro AS ( IF(status_infracao != "", status_infracao, NULL) AS status_infracao, IF(codigo_enquadramento != "", codigo_enquadramento, NULL) AS codigo_enquadramento, IF(tipificacao_resumida != "", tipificacao_resumida, NULL) AS tipificacao_resumida, - IF(pontuacao != "", pontuacao, NULL) AS pontuacao, + SUBSTR(pontuacao, 1, 1) AS pontuacao, gravidade, amparo_legal, - IF(tipo_veiculo != "", tipo_veiculo, NULL) AS tipo_veiculo, + INITCAP(tipo_veiculo) AS tipo_veiculo, IF(descricao_veiculo != "", descricao_veiculo, NULL) AS descricao_veiculo, placa_veiculo, ano_fabricacao_veiculo, ano_modelo_veiculo, cor_veiculo, - IF(especie_veiculo != "", especie_veiculo, NULL) AS especie_veiculo, + especie_veiculo, uf_infrator, uf_principal_condutor, IF(uf_proprietario != "", uf_proprietario, NULL) AS uf_proprietario, From d354197929a86d49b7a8c66980afc38363e195fc Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Wed, 9 Oct 2024 16:47:27 -0300 Subject: [PATCH 31/54] adiciona colunas serpro --- queries/models/transito/autuacao.sql | 10 +++++++--- queries/models/transito/staging/autuacao_serpro.sql | 8 ++++++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/queries/models/transito/autuacao.sql b/queries/models/transito/autuacao.sql index 12a8de41f..f2738474f 100644 --- a/queries/models/transito/autuacao.sql +++ b/queries/models/transito/autuacao.sql @@ -18,7 +18,7 @@ WITH citran AS ( data_limite_recurso, situacao_atual AS descricao_situacao_autuacao, IF(status_infracao != "", status_infracao, NULL) AS status_infracao, - SUBSTR(REPLACE(codigo_enquadramento, '-', ''), 1, 4) AS codigo_enquadramento, + REPLACE(codigo_enquadramento, '-', '') AS codigo_enquadramento, IF(tipificacao_resumida != "", tipificacao_resumida, NULL) AS tipificacao_resumida, SAFE_CAST(SUBSTR(REGEXP_EXTRACT(pontuacao, r'\d+'), 2) AS STRING) AS pontuacao, CASE @@ -53,7 +53,7 @@ WITH citran AS ( "6001" AS id_municipio_autuacao, "RIO DE JANEIRO" AS descricao_municipio, "RJ" AS uf_autuacao, - NULL AS cep_autuacao, -- não padronizado na citran + endereco_autuacao AS cep_autuacao, NULL AS tile_autuacao, IF(processo_defesa_autuacao != "00000000" AND processo_defesa_autuacao != "" , processo_defesa_autuacao, NULL) AS processo_defesa_autuacao, IF(recurso_penalidade_multa != "00000000" AND recurso_penalidade_multa != "" , recurso_penalidade_multa, NULL) AS recurso_penalidade_multa, @@ -99,7 +99,11 @@ serpro AS ( COALESCE(id_municipio_autuacao,"6001") AS id_municipio_autuacao, COALESCE(descricao_municipio, "RIO DE JANEIRO") AS descricao_municipio, COALESCE(uf_autuacao,"RJ") AS uf_autuacao, - NULL AS cep_autuacao, + CASE + WHEN logradouro_autuacao IS NOT NULL THEN + RTRIM(REGEXP_REPLACE(CONCAT(logradouro_autuacao, ' ', bairro_autuacao, ' ', complemento), r'\s+', ' ')) + ELSE NULL + END AS cep_autuacao, NULL AS tile_autuacao, processo_defesa_autuacao, recurso_penalidade_multa, diff --git a/queries/models/transito/staging/autuacao_serpro.sql b/queries/models/transito/staging/autuacao_serpro.sql index 44049788f..d5a91e33c 100644 --- a/queries/models/transito/staging/autuacao_serpro.sql +++ b/queries/models/transito/staging/autuacao_serpro.sql @@ -35,7 +35,15 @@ SELECT SAFE_CAST(JSON_VALUE(content,'$.id_municipio') AS STRING) AS id_municipio_autuacao, SAFE_CAST(JSON_VALUE(content,'$.mun_nome') AS STRING) AS descricao_municipio, SAFE_CAST(JSON_VALUE(content,'$.uf_sigla') AS STRING) AS uf_autuacao, + SAFE_CAST(JSON_VALUE(content,'$.auinf_local_ende_bairro') AS STRING) AS bairro_autuacao, + SAFE_CAST(JSON_VALUE(content,'$.auinf_local_ende_cep') AS STRING) AS cep_autuacao, + SAFE_CAST(JSON_VALUE(content,'$.auinf_local_ende_logradouro') AS STRING) AS logradouro_autuacao, + SAFE_CAST(JSON_VALUE(content,'$.auinf_local_ende_numero') AS STRING) AS ende_numero_autuacao, + SAFE_CAST(JSON_VALUE(content,'$.complemento') AS STRING) AS complemento, SAFE_CAST(JSON_VALUE(content,'$.defp_num_processo') AS STRING) AS processo_defesa_autuacao, + SAFE_CAST(JSON_VALUE(content,'$.rrso_num_processo') AS STRING) AS rrso_num_processo, + SAFE_CAST(JSON_VALUE(content,'$.cpa_num_processo') AS STRING) AS cpa_num_processo, + SAFE_CAST(JSON_VALUE(content,'$.susp_num_processo') AS STRING) AS susp_num_processo, SAFE_CAST(JSON_VALUE(content,'$.canc_num_processo') AS STRING) AS recurso_penalidade_multa, SAFE_CAST(JSON_VALUE(content,'$.ri_proc_nr') AS STRING) AS processo_troca_real_infrator, SAFE_CAST(JSON_VALUE(content,'$.auinf_veiculo_adesao_sne_indicador') AS STRING) AS status_sne From 52e8052df2f2ff9a27f5483a4f8b29243ef34a2c Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Mon, 16 Dec 2024 10:52:53 -0300 Subject: [PATCH 32/54] add batch_size --- pipelines/serpro/tasks.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pipelines/serpro/tasks.py b/pipelines/serpro/tasks.py index 29fc1b1cf..7f97260d5 100644 --- a/pipelines/serpro/tasks.py +++ b/pipelines/serpro/tasks.py @@ -22,7 +22,9 @@ def get_db_object(secret_path="radar_serpro", environment: str = "dev"): @task(checkpoint=False, nout=2) -def get_raw_serpro(jdbc: JDBC, timestamp: datetime, local_filepath: str) -> str: +def get_raw_serpro( + jdbc: JDBC, timestamp: datetime, local_filepath: str, batch_size: int = 100000 +) -> str: date = timestamp.date() raw_filepath = local_filepath.format(mode="raw", filetype="csv") Path(raw_filepath).parent.mkdir(parents=True, exist_ok=True) @@ -32,7 +34,7 @@ def get_raw_serpro(jdbc: JDBC, timestamp: datetime, local_filepath: str) -> str: jdbc.execute_query(query) columns = jdbc.get_columns() - rows = jdbc.fetch_all() + rows = jdbc.fetch_batch(batch_size=batch_size) with open(raw_filepath, "w", newline="") as csvfile: writer = csv.writer(csvfile) From 213ae37e36a15d4f4ce8055b26dc0d7368084f74 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 16 Dec 2024 13:55:26 +0000 Subject: [PATCH 33/54] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pipelines/tasks.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pipelines/tasks.py b/pipelines/tasks.py index 831e23627..475a5cd11 100644 --- a/pipelines/tasks.py +++ b/pipelines/tasks.py @@ -215,6 +215,7 @@ def run_subflow( if flag_failed_runs: raise FailedSubFlow(failed_message) + @task def run_dbt_selector( selector_name: str, From 30ab64b878f6707b7c0f52b712bf48ba71713f0f Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Mon, 16 Dec 2024 11:42:29 -0300 Subject: [PATCH 34/54] target dev --- pipelines/tasks.py | 46 -------------------------------------------- queries/profiles.yml | 2 +- 2 files changed, 1 insertion(+), 47 deletions(-) diff --git a/pipelines/tasks.py b/pipelines/tasks.py index 475a5cd11..7029385e5 100644 --- a/pipelines/tasks.py +++ b/pipelines/tasks.py @@ -216,52 +216,6 @@ def run_subflow( raise FailedSubFlow(failed_message) -@task -def run_dbt_selector( - selector_name: str, - flags: str = None, - _vars: dict | list[dict] = None, -): - """ - Runs a DBT selector. - - Args: - selector_name (str): The name of the DBT selector to run. - flags (str, optional): Flags to pass to the dbt run command. - _vars (Union[dict, list[dict]], optional): Variables to pass to dbt. Defaults to None. - """ - # Build the dbt command - run_command = f"dbt run --selector {selector_name}" - - if _vars: - if isinstance(_vars, list): - vars_dict = {} - for elem in _vars: - vars_dict.update(elem) - vars_str = f'"{vars_dict}"' - run_command += f" --vars {vars_str}" - else: - vars_str = f'"{_vars}"' - run_command += f" --vars {vars_str}" - - if flags: - run_command += f" {flags}" - - log(f"Running dbt with command: {run_command}") - root_path = get_root_path() - queries_dir = str(root_path / "queries") - dbt_task = DbtShellTask( - profiles_dir=queries_dir, - helper_script=f"cd {queries_dir}", - log_stderr=True, - return_all=True, - command=run_command, - ) - dbt_logs = dbt_task.run() - - log("\n".join(dbt_logs)) - - @task def get_timestamp_range(start_date: str = None, end_date: str = None) -> List[str]: """ diff --git a/queries/profiles.yml b/queries/profiles.yml index 3ac185e4d..054a713ef 100644 --- a/queries/profiles.yml +++ b/queries/profiles.yml @@ -78,4 +78,4 @@ queries: spark.executor.instances: "2" spark.driver.memory: 4g spark.driver.memoryOverhead: 1g - target: hmg \ No newline at end of file + target: dev \ No newline at end of file From 5686c0d6f0c66c0a167a36192c9203df09685aa2 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Mon, 6 Jan 2025 15:16:10 -0300 Subject: [PATCH 35/54] altera target para hmg --- queries/profiles.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queries/profiles.yml b/queries/profiles.yml index 054a713ef..3ac185e4d 100644 --- a/queries/profiles.yml +++ b/queries/profiles.yml @@ -78,4 +78,4 @@ queries: spark.executor.instances: "2" spark.driver.memory: 4g spark.driver.memoryOverhead: 1g - target: dev \ No newline at end of file + target: hmg \ No newline at end of file From b96ad20636327568c7df2fd31cea715abb1e61ea Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Mon, 6 Jan 2025 15:16:37 -0300 Subject: [PATCH 36/54] add source infracoes_renainf --- queries/models/sources.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/queries/models/sources.yml b/queries/models/sources.yml index 2f7a6f8b7..b002c928f 100644 --- a/queries/models/sources.yml +++ b/queries/models/sources.yml @@ -167,6 +167,7 @@ sources: - name: autuacoes_citran - name: autuacoes_serpro - name: receita_autuacao + - name: infracoes_renainf - name: dados_mestres database: datario From 672d1708ddc3d47f3d1fc15db27fb54788d71dc0 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Mon, 6 Jan 2025 15:18:03 -0300 Subject: [PATCH 37/54] adiciona novas colunas --- queries/models/transito/schema.yml | 2 +- .../transito/staging/autuacao_serpro.sql | 237 ++++++++++++++---- 2 files changed, 188 insertions(+), 51 deletions(-) diff --git a/queries/models/transito/schema.yml b/queries/models/transito/schema.yml index 880ae3987..0e42cea86 100644 --- a/queries/models/transito/schema.yml +++ b/queries/models/transito/schema.yml @@ -60,7 +60,7 @@ models: description: "Indicador de adesão do veículo ao SNE" - name: autuacao_serpro - description: "Tabela com dados de autuações fornecidos pelo sistema CITRAN/IplanRio" + description: "Tabela com dados de autuações fornecidos pelo sistema SERPRO" columns: - name: data description: "Data da autuação (Coluna de particionamento)" diff --git a/queries/models/transito/staging/autuacao_serpro.sql b/queries/models/transito/staging/autuacao_serpro.sql index d5a91e33c..6149624c6 100644 --- a/queries/models/transito/staging/autuacao_serpro.sql +++ b/queries/models/transito/staging/autuacao_serpro.sql @@ -1,52 +1,189 @@ +{{ config(materialized="view") }} -{{ config( - materialized='view' - ) -}} - - -SELECT - DATE(data) AS data, - auinf_num_auto AS id_auto_infracao, - DATETIME(PARSE_TIMESTAMP('%Y-%m-%d %H:%M:%S', REGEXP_REPLACE(SAFE_CAST(JSON_VALUE(content, '$.auinf_dt_infracao') AS STRING), r'\.\d+', '')), 'America/Sao_Paulo') AS datetime_autuacao, - IF(JSON_VALUE(content, '$.auinf_dt_limite_defesa_previa') != '', SAFE_CAST(PARSE_DATE('%Y-%m-%d', JSON_VALUE(content,'$.auinf_dt_limite_defesa_previa')) AS STRING), NULL) AS data_limite_defesa_previa, - IF(JSON_VALUE(content, '$.auinf_dt_limite_recurso') != '', SAFE_CAST(PARSE_DATE('%Y-%m-%d', JSON_VALUE(content,'$.auinf_dt_limite_recurso')) AS STRING), NULL) AS data_limite_recurso, - SAFE_CAST(JSON_VALUE(content,'$.stat_dsc_status_ai') AS STRING) AS descricao_situacao_autuacao, - SAFE_CAST(JSON_VALUE(content,'$.stfu_dsc_status_fluxo_ai') AS STRING) AS status_infracao, - SAFE_CAST(JSON_VALUE(content,'$.htpi_cod_tipo_infracao') AS STRING) AS codigo_enquadramento, - SAFE_CAST(JSON_VALUE(content,'$.htpi_dsc_tipo_infracao') AS STRING) AS tipificacao_resumida, - SAFE_CAST(JSON_VALUE(content,'$.htpi_pontosdainfracao') AS STRING) AS pontuacao, - SAFE_CAST(JSON_VALUE(content,'$.hgrav_descricao') AS STRING) AS gravidade, - SAFE_CAST(JSON_VALUE(content,'$.htpi_amparo_legal') AS STRING) AS amparo_legal, - SAFE_CAST(JSON_VALUE(content,'$.auinf_vei_tipo') AS STRING) AS tipo_veiculo, - SAFE_CAST(JSON_VALUE(content,'$.auinf_veiculo_marca_modelo_informado') AS STRING) AS descricao_veiculo, - SAFE_CAST(JSON_VALUE(content,'$.auinf_veiculo_placa') AS STRING) AS placa_veiculo, - SAFE_CAST(JSON_VALUE(content,'$.auinf_veiculo_ano_fabricacao') AS STRING) AS ano_fabricacao_veiculo, - SAFE_CAST(JSON_VALUE(content,'$.auinf_veiculo_ano_modelo') AS STRING) AS ano_modelo_veiculo, - SAFE_CAST(JSON_VALUE(content,'$.auinf_veiculo_cor_desc') AS STRING) AS cor_veiculo, - SAFE_CAST(JSON_VALUE(content,'$.auinf_veiculo_especie_desc') AS STRING) AS especie_veiculo, - SAFE_CAST(JSON_VALUE(content,'$.uf_infrator') AS STRING) AS uf_infrator, - SAFE_CAST(JSON_VALUE(content,'$.uf_princ_cond') AS STRING) AS uf_principal_condutor, - SAFE_CAST(JSON_VALUE(content,'$.uf_prop_orig') AS STRING) AS uf_proprietario, - SAFE_CAST(JSON_VALUE(content,'$.auinf_infracao_valor') AS NUMERIC) AS valor_infracao, - SAFE_CAST(JSON_VALUE(content,'$.pag_valor') AS NUMERIC) AS valor_pago, - SAFE_CAST(JSON_VALUE(content,'$.auinf_id_orgao') AS STRING) AS id_autuador, - SAFE_CAST(JSON_VALUE(content,'$.unaut_dsc_unidade') AS STRING) AS descricao_autuador, - SAFE_CAST(JSON_VALUE(content,'$.id_municipio') AS STRING) AS id_municipio_autuacao, - SAFE_CAST(JSON_VALUE(content,'$.mun_nome') AS STRING) AS descricao_municipio, - SAFE_CAST(JSON_VALUE(content,'$.uf_sigla') AS STRING) AS uf_autuacao, - SAFE_CAST(JSON_VALUE(content,'$.auinf_local_ende_bairro') AS STRING) AS bairro_autuacao, - SAFE_CAST(JSON_VALUE(content,'$.auinf_local_ende_cep') AS STRING) AS cep_autuacao, - SAFE_CAST(JSON_VALUE(content,'$.auinf_local_ende_logradouro') AS STRING) AS logradouro_autuacao, - SAFE_CAST(JSON_VALUE(content,'$.auinf_local_ende_numero') AS STRING) AS ende_numero_autuacao, - SAFE_CAST(JSON_VALUE(content,'$.complemento') AS STRING) AS complemento, - SAFE_CAST(JSON_VALUE(content,'$.defp_num_processo') AS STRING) AS processo_defesa_autuacao, - SAFE_CAST(JSON_VALUE(content,'$.rrso_num_processo') AS STRING) AS rrso_num_processo, - SAFE_CAST(JSON_VALUE(content,'$.cpa_num_processo') AS STRING) AS cpa_num_processo, - SAFE_CAST(JSON_VALUE(content,'$.susp_num_processo') AS STRING) AS susp_num_processo, - SAFE_CAST(JSON_VALUE(content,'$.canc_num_processo') AS STRING) AS recurso_penalidade_multa, - SAFE_CAST(JSON_VALUE(content,'$.ri_proc_nr') AS STRING) AS processo_troca_real_infrator, - SAFE_CAST(JSON_VALUE(content,'$.auinf_veiculo_adesao_sne_indicador') AS STRING) AS status_sne -FROM - {{ source('infracao_staging','autuacoes_serpro') }} +select + date(data) as data, + auinf_num_auto as id_auto_infracao, + safe_cast( + json_value(content, '$.auinf_origem_desc') as string + ) as origem_auto_infracao, + datetime( + parse_timestamp( + '%Y-%m-%d %H:%M:%S', + regexp_replace( + safe_cast(json_value(content, '$.auinf_dt_infracao') as string), + r'\.\d+', + '' + ) + ), + 'America/Sao_Paulo' + ) as datetime_autuacao, + if( + json_value(content, '$.auinf_dt_limite_defesa_previa') != '', + safe_cast( + parse_date( + '%Y-%m-%d', json_value(content, '$.auinf_dt_limite_defesa_previa') + ) as string + ), + null + ) as data_limite_defesa_previa, + if( + json_value(content, '$.auinf_dt_limite_recurso') != '', + safe_cast( + parse_date( + '%Y-%m-%d', json_value(content, '$.auinf_dt_limite_recurso') + ) as string + ), + null + ) as data_limite_recurso, + safe_cast( + json_value(content, '$.stat_dsc_status_ai') as string + ) as descricao_situacao_autuacao, + safe_cast( + json_value(content, '$.stfu_dsc_status_fluxo_ai') as string + ) as status_infracao, + safe_cast( + json_value(content, '$.htpi_cod_tipo_infracao') as string + ) as codigo_enquadramento, + safe_cast( + json_value(content, '$.htpi_desdobramento') as string + ) as codigo_desdobramento, + safe_cast( + json_value(content, '$.htpi_dsc_tipo_infracao') as string + ) as tipificacao_resumida, + safe_cast(json_value(content, '$.htpi_pontosdainfracao') as string) as pontuacao, + safe_cast(json_value(content, '$.hgrav_descricao') as string) as gravidade, + safe_cast(json_value(content, '$.htpi_amparo_legal') as string) as amparo_legal, + safe_cast( + json_value(content, '$.auinf_infracao_medicao_valor_aferido') as string + ) as velocidade_aferida, + safe_cast( + json_value(content, '$.auinf_infracao_medicao_valor_considerado') as string + ) as velocidade_considerada, + safe_cast( + json_value(content, '$.auinf_infracao_medicao_limite_regulam') as string + ) as velocidade_regulamentada, + safe_cast(json_value(content, '$.auinf_vei_tipo') as string) as tipo_veiculo, + safe_cast( + json_value(content, '$.auinf_veiculo_marca_modelo_informado') as string + ) as descricao_veiculo, + safe_cast(json_value(content, '$.auinf_veiculo_placa') as string) as placa_veiculo, + safe_cast( + json_value(content, '$.auinf_veiculo_chassi') as string + ) as chassi_veiculo, + safe_cast( + json_value(content, '$.auinf_veiculo_ano_fabricacao') as string + ) as ano_fabricacao_veiculo, + safe_cast( + json_value(content, '$.auinf_veiculo_ano_modelo') as string + ) as ano_modelo_veiculo, + safe_cast(json_value(content, '$.auinf_veiculo_cor_desc') as string) as cor_veiculo, + safe_cast( + json_value(content, '$.auinf_veiculo_especie_desc') as string + ) as especie_veiculo, + safe_cast(json_value(content, '$.uf_veiculo') as string) as uf_veiculo, + safe_cast( + json_value(content, '$.nome_proprietario') as string + ) as nome_proprietario, + safe_cast( + json_value(content, '$.numero_identificacao_proprietario') as string + ) as cpf_proprietario, + safe_cast( + json_value(content, '$.numero_cnh_proprietario') as string + ) as cnh_proprietario, + safe_cast(json_value(content, '$.uf_cnh_prop') as string) as uf_cnh_proprietario, + safe_cast(json_value(content, '$.uf_prop_orig') as string) as uf_proprietario, + safe_cast( + json_value( + content, '$.auinf_infrator_condutor_nao_habilitado_numero_doc' + ) as string + ) as numero_condutor_nao_habilitado, + safe_cast(json_value(content, '$.nome_ch_condutor') as string) as nome_condutor, + safe_cast( + json_value(content, '$.numero_identificacao_condutor') as string + ) as numero_identificacao_condutor, + safe_cast( + json_value(content, '$.numero_registro_ch_condutor') as string + ) as cnh_condutor, + safe_cast( + json_value(content, '$.uf_princ_cond') as string + ) as uf_principal_condutor, + safe_cast(json_value(content, '$.nome_infrator') as string) as nome_infrator, + safe_cast(json_value(content, '$.cpf_infrator') as string) as cpf_infrator, + safe_cast( + json_value(content, '$.auinf_infrator_condutor_habilitado_numero_doc') as string + ) as cnh_infrator, + safe_cast( + json_value(content, '$.auinf_infracao_valor') as numeric + ) as valor_infracao, + safe_cast(json_value(content, '$.pag_valor') as numeric) as valor_pago, + if( + json_value(content, '$.pag_data_pagamento') != '', + date( + parse_timestamp( + '%Y-%m-%d %H:%M:%S', + regexp_replace( + safe_cast(json_value(content, '$.pag_data_pagamento') as string), + r'\.\d+', + '' + ) + ), + 'America/Sao_Paulo' + ), + null + ) as data_pagamento, + safe_cast(json_value(content, '$.ban_codigo_banco') as string) as codigo_banco, + safe_cast(json_value(content, '$.ban_nome_banco') as string) as nome_banco, + safe_cast( + json_value(content, '$.pag_status_pagamento') as string + ) as status_pagamento, + safe_cast( + json_value(content, '$.auinf_codigo_renainf') as string + ) as codigo_auto_infracao_renainf, + safe_cast(json_value(content, '$.auinf_id_orgao') as string) as id_autuador, + safe_cast( + json_value(content, '$.unaut_dsc_unidade') as string + ) as descricao_autuador, + safe_cast( + json_value(content, '$.usu_num_matricula') as string + ) as matricula_autuador, + safe_cast(json_value(content, '$.id_municipio') as string) as id_municipio_autuacao, + safe_cast(json_value(content, '$.mun_nome') as string) as descricao_municipio, + safe_cast(json_value(content, '$.uf_sigla') as string) as uf_autuacao, + safe_cast( + json_value(content, '$.auinf_local_ende_bairro') as string + ) as bairro_autuacao, + safe_cast(json_value(content, '$.auinf_local_ende_cep') as string) as cep_autuacao, + safe_cast( + json_value(content, '$.auinf_local_ende_logradouro') as string + ) as logradouro_autuacao, + safe_cast( + json_value(content, '$.auinf_local_ende_numero') as string + ) as ende_numero_autuacao, + safe_cast(json_value(content, '$.complemento') as string) as complemento, + safe_cast( + json_value(content, '$.auinf_local_rodovia') as string + ) as logradouro_autuacao_2, -- Deveria ser somente rodovias + safe_cast( + json_value(content, '$.auinf_observacao') as string + ) as observacao_autuacao, + safe_cast( + json_value(content, '$.defp_num_processo') as string + ) as processo_defesa_autuacao, + safe_cast( + json_value(content, '$.rrso_num_processo') as string + ) as rrso_num_processo, + safe_cast(json_value(content, '$.cpa_num_processo') as string) as cpa_num_processo, + safe_cast( + json_value(content, '$.susp_num_processo') as string + ) as susp_num_processo, + safe_cast( + json_value(content, '$.canc_num_processo') as string + ) as recurso_penalidade_multa, + safe_cast( + json_value(content, '$.ri_proc_nr') as string + ) as processo_troca_real_infrator, + safe_cast( + json_value(content, '$.auinf_veiculo_adesao_sne_indicador') as string + ) as status_sne +from {{ source("infracao_staging", "autuacoes_serpro") }} From b11da43a52d2be59ea48e8b895ddba73b2316663 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Mon, 6 Jan 2025 15:19:05 -0300 Subject: [PATCH 38/54] add join infracoes_renainf --- queries/models/transito/autuacao.sql | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/queries/models/transito/autuacao.sql b/queries/models/transito/autuacao.sql index f2738474f..edb2c0293 100644 --- a/queries/models/transito/autuacao.sql +++ b/queries/models/transito/autuacao.sql @@ -9,7 +9,15 @@ incremental_strategy='insert_overwrite' ) }} -WITH citran AS ( +-- add lógica para ter um historico da autuação com base na coluna de data_atualização + +WITH infracoes_renainf AS ( + SELECT CONCAT(codigo_infracao, desdobramento) AS codigo_enquadramento, + descricao_infracao AS tipificacao_resumida + FROM + {{ source("infracao_staging", "infracoes_renainf") }} +), +citran AS ( SELECT data, id_auto_infracao, @@ -19,7 +27,7 @@ WITH citran AS ( situacao_atual AS descricao_situacao_autuacao, IF(status_infracao != "", status_infracao, NULL) AS status_infracao, REPLACE(codigo_enquadramento, '-', '') AS codigo_enquadramento, - IF(tipificacao_resumida != "", tipificacao_resumida, NULL) AS tipificacao_resumida, + i.tipificacao_resumida, SAFE_CAST(SUBSTR(REGEXP_EXTRACT(pontuacao, r'\d+'), 2) AS STRING) AS pontuacao, CASE WHEN INITCAP(REGEXP_REPLACE(pontuacao, r'\d+', '')) = 'Media' THEN 'Média' @@ -61,6 +69,9 @@ WITH citran AS ( FALSE AS status_sne, "CITRAN" AS fonte FROM {{ ref('autuacao_citran') }} + LEFT JOIN + infracoes_renainf AS i + USING(codigo_enquadramento) {% if is_incremental() %} WHERE data BETWEEN DATE("{{var('date_range_start')}}") AND DATE("{{var('date_range_end')}}") @@ -75,8 +86,8 @@ serpro AS ( data_limite_recurso, descricao_situacao_autuacao, IF(status_infracao != "", status_infracao, NULL) AS status_infracao, - IF(codigo_enquadramento != "", codigo_enquadramento, NULL) AS codigo_enquadramento, - IF(tipificacao_resumida != "", tipificacao_resumida, NULL) AS tipificacao_resumida, + CONCAT(codigo_enquadramento, codigo_desdobramento) AS codigo_enquadramento, + i.tipificacao_resumida, SUBSTR(pontuacao, 1, 1) AS pontuacao, gravidade, amparo_legal, @@ -93,7 +104,7 @@ serpro AS ( SAFE_CAST(NULL AS STRING) AS cep_proprietario, valor_infracao, valor_pago, - SAFE_CAST(NULL AS STRING) AS data_pagamento, + data_pagamento, COALESCE(id_autuador, "260010") AS id_autuador, IF(descricao_autuador != "", descricao_autuador, NULL) AS descricao_autuador, COALESCE(id_municipio_autuacao,"6001") AS id_municipio_autuacao, @@ -111,6 +122,9 @@ serpro AS ( IF(status_sne = "1.0", TRUE, FALSE) AS status_sne, "SERPRO" AS fonte FROM {{ ref('autuacao_serpro') }} + LEFT JOIN + infracoes_renainf AS i + USING(codigo_enquadramento) {% if is_incremental() %} WHERE data BETWEEN DATE("{{var('date_range_start')}}") AND DATE("{{var('date_range_end')}}") From 7cc3368641fa7e555da1969b5d04562a5868b06d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 6 Jan 2025 19:10:50 +0000 Subject: [PATCH 39/54] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pipelines/tasks.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pipelines/tasks.py b/pipelines/tasks.py index 8b6daec4b..dc9e10ace 100644 --- a/pipelines/tasks.py +++ b/pipelines/tasks.py @@ -243,6 +243,7 @@ def get_timestamp_range(start_date: str = None, end_date: str = None) -> List[st return timestamps + @task(trigger=all_finished) def check_fail(results: Union[list, str]): """ From bf47bc978d3fdbd890ba259a20318b8b3fdf93bb Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Tue, 7 Jan 2025 13:35:14 -0300 Subject: [PATCH 40/54] altera where para teste --- pipelines/serpro/constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/serpro/constants.py b/pipelines/serpro/constants.py index 6341eca4f..52e1d5671 100644 --- a/pipelines/serpro/constants.py +++ b/pipelines/serpro/constants.py @@ -25,7 +25,7 @@ class constants(Enum): # pylint: disable=c0103 FROM dbpro_radar_view_SMTR_VBL.tb_infracao_view WHERE - SUBSTRING(auinf_dt_infracao, 1, 10) = '{date}' + TO_DATE(SUBSTRING(auinf_dt_infracao, 1, 10), 'YYYY-MM-DD') = '{date}' """, "primary_key": ["auinf_num_auto"], "pre_treatment_reader_args": {"dtype": "object"}, From 5851055ac468e1a42d49ce2e801505b33b568951 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Tue, 7 Jan 2025 13:58:08 -0300 Subject: [PATCH 41/54] altera where para teste --- pipelines/serpro/constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/serpro/constants.py b/pipelines/serpro/constants.py index 52e1d5671..403ff20f2 100644 --- a/pipelines/serpro/constants.py +++ b/pipelines/serpro/constants.py @@ -25,7 +25,7 @@ class constants(Enum): # pylint: disable=c0103 FROM dbpro_radar_view_SMTR_VBL.tb_infracao_view WHERE - TO_DATE(SUBSTRING(auinf_dt_infracao, 1, 10), 'YYYY-MM-DD') = '{date}' + PARSEDATE(SUBSTRING(auinf_dt_infracao, 1, 10), 'yyyy-MM-dd') = '{date}' """, "primary_key": ["auinf_num_auto"], "pre_treatment_reader_args": {"dtype": "object"}, From 2414013d3025f77e5983df26a486f09747c9df17 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Tue, 7 Jan 2025 16:42:40 -0300 Subject: [PATCH 42/54] atualiza modelos --- queries/models/transito/autuacao.sql | 461 +++++++++++------- .../transito/staging/autuacao_citran.sql | 72 +-- .../transito/staging/autuacao_serpro.sql | 1 + 3 files changed, 322 insertions(+), 212 deletions(-) diff --git a/queries/models/transito/autuacao.sql b/queries/models/transito/autuacao.sql index edb2c0293..af67ab94b 100644 --- a/queries/models/transito/autuacao.sql +++ b/queries/models/transito/autuacao.sql @@ -1,181 +1,281 @@ -{{ config( - materialized='incremental', - partition_by={ - "field":"data", - "data_type":"date", - "granularity": "day" - }, - unique_key='id_autuacao', - incremental_strategy='insert_overwrite' -) }} +{{ + config( + materialized="incremental", + partition_by={"field": "data", "data_type": "date", "granularity": "day"}, + unique_key="id_autuacao", + incremental_strategy="insert_overwrite", + ) +}} --- add lógica para ter um historico da autuação com base na coluna de data_atualização - -WITH infracoes_renainf AS ( - SELECT CONCAT(codigo_infracao, desdobramento) AS codigo_enquadramento, - descricao_infracao AS tipificacao_resumida - FROM - {{ source("infracao_staging", "infracoes_renainf") }} -), -citran AS ( - SELECT - data, - id_auto_infracao, - DATETIME(concat(data,' ',hora,':00')) AS datetime_autuacao, - data_limite_defesa_previa, - data_limite_recurso, - situacao_atual AS descricao_situacao_autuacao, - IF(status_infracao != "", status_infracao, NULL) AS status_infracao, - REPLACE(codigo_enquadramento, '-', '') AS codigo_enquadramento, - i.tipificacao_resumida, - SAFE_CAST(SUBSTR(REGEXP_EXTRACT(pontuacao, r'\d+'), 2) AS STRING) AS pontuacao, - CASE - WHEN INITCAP(REGEXP_REPLACE(pontuacao, r'\d+', '')) = 'Media' THEN 'Média' - WHEN INITCAP(REGEXP_REPLACE(pontuacao, r'\d+', '')) = 'Gravissima' THEN 'Gravíssima' - ELSE INITCAP(REGEXP_REPLACE(pontuacao, r'\d+', '')) - END AS gravidade, - SAFE_CAST(NULL AS STRING) AS amparo_legal, - INITCAP(tipo_veiculo) AS tipo_veiculo, - IF(descricao_veiculo != "", descricao_veiculo, NULL) AS descricao_veiculo, - SAFE_CAST(NULL AS STRING) AS placa_veiculo, - SAFE_CAST(NULL AS STRING) AS ano_fabricacao_veiculo, - SAFE_CAST(NULL AS STRING) AS ano_modelo_veiculo, - SAFE_CAST(NULL AS STRING) AS cor_veiculo, - CASE - WHEN INITCAP(REGEXP_REPLACE(especie_veiculo, r'\d+', '')) IN ('Misto', '0Misto') THEN 'Misto' - WHEN INITCAP(REGEXP_REPLACE(especie_veiculo, r'\d+', '')) IN ('Passageir', '0Passageir', 'Passageiro', '0Passageiro') THEN 'Passageiro' - WHEN INITCAP(REGEXP_REPLACE(especie_veiculo, r'\d+', '')) IN ('Tracao', '0Tracao', 'Tracao') THEN 'Tração' - WHEN INITCAP(REGEXP_REPLACE(especie_veiculo, r'\d+', '')) IN ('Nao Inform', '0Nao Inform', 'Nao Informado', '0Nao Informado') THEN 'Não informado' - WHEN INITCAP(REGEXP_REPLACE(especie_veiculo, r'\d+', '')) IN ('Carga', '0Carga') THEN 'Carga' - ELSE 'Inválido' - END AS especie_veiculo, - SAFE_CAST(NULL AS STRING) AS uf_infrator, - SAFE_CAST(NULL AS STRING) AS uf_principal_condutor, - IF(uf_proprietario != "", uf_proprietario, NULL) AS uf_proprietario, - IF(cep_proprietario != "", cep_proprietario, NULL) AS cep_proprietario, - valor_infracao / 100 AS valor_infracao, - valor_pago / 100 AS valor_pago, - data_pagamento, - "260010" AS id_autuador, - IF(descricao_autuador != "", descricao_autuador, NULL) AS descricao_autuador, - "6001" AS id_municipio_autuacao, - "RIO DE JANEIRO" AS descricao_municipio, - "RJ" AS uf_autuacao, - endereco_autuacao AS cep_autuacao, - NULL AS tile_autuacao, - IF(processo_defesa_autuacao != "00000000" AND processo_defesa_autuacao != "" , processo_defesa_autuacao, NULL) AS processo_defesa_autuacao, - IF(recurso_penalidade_multa != "00000000" AND recurso_penalidade_multa != "" , recurso_penalidade_multa, NULL) AS recurso_penalidade_multa, - IF(processo_troca_real_infrator != "00000000" AND processo_troca_real_infrator != "" , processo_troca_real_infrator, NULL) AS processo_troca_real_infrator, - FALSE AS status_sne, - "CITRAN" AS fonte - FROM {{ ref('autuacao_citran') }} - LEFT JOIN - infracoes_renainf AS i - USING(codigo_enquadramento) - {% if is_incremental() %} - WHERE - data BETWEEN DATE("{{var('date_range_start')}}") AND DATE("{{var('date_range_end')}}") - {% endif %} -), -serpro AS ( - SELECT - data, - id_auto_infracao, - datetime_autuacao, - data_limite_defesa_previa, - data_limite_recurso, - descricao_situacao_autuacao, - IF(status_infracao != "", status_infracao, NULL) AS status_infracao, - CONCAT(codigo_enquadramento, codigo_desdobramento) AS codigo_enquadramento, - i.tipificacao_resumida, - SUBSTR(pontuacao, 1, 1) AS pontuacao, - gravidade, - amparo_legal, - INITCAP(tipo_veiculo) AS tipo_veiculo, - IF(descricao_veiculo != "", descricao_veiculo, NULL) AS descricao_veiculo, - placa_veiculo, - ano_fabricacao_veiculo, - ano_modelo_veiculo, - cor_veiculo, - especie_veiculo, - uf_infrator, - uf_principal_condutor, - IF(uf_proprietario != "", uf_proprietario, NULL) AS uf_proprietario, - SAFE_CAST(NULL AS STRING) AS cep_proprietario, - valor_infracao, - valor_pago, - data_pagamento, - COALESCE(id_autuador, "260010") AS id_autuador, - IF(descricao_autuador != "", descricao_autuador, NULL) AS descricao_autuador, - COALESCE(id_municipio_autuacao,"6001") AS id_municipio_autuacao, - COALESCE(descricao_municipio, "RIO DE JANEIRO") AS descricao_municipio, - COALESCE(uf_autuacao,"RJ") AS uf_autuacao, - CASE - WHEN logradouro_autuacao IS NOT NULL THEN - RTRIM(REGEXP_REPLACE(CONCAT(logradouro_autuacao, ' ', bairro_autuacao, ' ', complemento), r'\s+', ' ')) - ELSE NULL - END AS cep_autuacao, - NULL AS tile_autuacao, - processo_defesa_autuacao, - recurso_penalidade_multa, - processo_troca_real_infrator, - IF(status_sne = "1.0", TRUE, FALSE) AS status_sne, - "SERPRO" AS fonte - FROM {{ ref('autuacao_serpro') }} - LEFT JOIN - infracoes_renainf AS i - USING(codigo_enquadramento) - {% if is_incremental() %} - WHERE - data BETWEEN DATE("{{var('date_range_start')}}") AND DATE("{{var('date_range_end')}}") - {% endif %} -) -SELECT - data, - TO_HEX(SHA256(CONCAT(GENERATE_UUID(), id_auto_infracao))) AS id_autuacao, - id_auto_infracao, - datetime_autuacao, - data_limite_defesa_previa, - data_limite_recurso, - descricao_situacao_autuacao, - status_infracao, - codigo_enquadramento, - tipificacao_resumida, - pontuacao, - gravidade, - amparo_legal, - tipo_veiculo, - descricao_veiculo, - placa_veiculo, - ano_fabricacao_veiculo, - ano_modelo_veiculo, - cor_veiculo, - especie_veiculo, - uf_infrator, - uf_principal_condutor, - uf_proprietario, - cep_proprietario, - valor_infracao, - valor_pago, - data_pagamento, - id_autuador, - descricao_autuador, - id_municipio_autuacao, - descricao_municipio, - uf_autuacao, - cep_autuacao, - tile_autuacao, - processo_defesa_autuacao, - recurso_penalidade_multa, - processo_troca_real_infrator, - status_sne, - fonte -FROM - citran -UNION ALL -SELECT +with + infracoes_renainf as ( + select + concat(codigo_infracao, desdobramento) as codigo_enquadramento, + descricao_infracao as tipificacao_resumida + from {{ source("infracao_staging", "infracoes_renainf") }} + ), + citran as ( + select + data, + id_auto_infracao, + datetime(concat(data, ' ', hora, ':00')) as datetime_autuacao, + data_limite_defesa_previa, + data_limite_recurso, + situacao_atual as descricao_situacao_autuacao, + if(status_infracao != "", status_infracao, null) as status_infracao, + replace(codigo_enquadramento, '-', '') as codigo_enquadramento, + safe_cast( + substr(regexp_extract(pontuacao, r'\d+'), 2) as string + ) as pontuacao, + case + when initcap(regexp_replace(pontuacao, r'\d+', '')) = 'Media' + then 'Média' + when initcap(regexp_replace(pontuacao, r'\d+', '')) = 'Gravissima' + then 'Gravíssima' + else initcap(regexp_replace(pontuacao, r'\d+', '')) + end as gravidade, + safe_cast(null as string) as amparo_legal, + initcap(tipo_veiculo) as tipo_veiculo, + if(descricao_veiculo != "", descricao_veiculo, null) as descricao_veiculo, + safe_cast(null as string) as placa_veiculo, + safe_cast(null as string) as ano_fabricacao_veiculo, + safe_cast(null as string) as ano_modelo_veiculo, + safe_cast(null as string) as cor_veiculo, + case + when + initcap(regexp_replace(especie_veiculo, r'\d+', '')) + in ('Misto', '0Misto') + then 'Misto' + when + initcap(regexp_replace(especie_veiculo, r'\d+', '')) + in ('Passageir', '0Passageir', 'Passageiro', '0Passageiro') + then 'Passageiro' + when + initcap(regexp_replace(especie_veiculo, r'\d+', '')) + in ('Tracao', '0Tracao', 'Tracao') + then 'Tração' + when + initcap(regexp_replace(especie_veiculo, r'\d+', '')) + in ('Nao Inform', '0Nao Inform', 'Nao Informado', '0Nao Informado') + then 'Não informado' + when + initcap(regexp_replace(especie_veiculo, r'\d+', '')) + in ('Carga', '0Carga') + then 'Carga' + else 'Inválido' + end as especie_veiculo, + safe_cast(null as string) as uf_infrator, + safe_cast(null as string) as uf_principal_condutor, + if(uf_proprietario != "", uf_proprietario, null) as uf_proprietario, + if(cep_proprietario != "", cep_proprietario, null) as cep_proprietario, + valor_infracao / 100 as valor_infracao, + valor_pago / 100 as valor_pago, + data_pagamento, + "260010" as id_autuador, + if( + descricao_autuador != "", descricao_autuador, null + ) as descricao_autuador, + "6001" as id_municipio_autuacao, + "RIO DE JANEIRO" as descricao_municipio, + "RJ" as uf_autuacao, + endereco_autuacao as cep_autuacao, + null as tile_autuacao, + if( + processo_defesa_autuacao != "00000000" + and processo_defesa_autuacao != "", + processo_defesa_autuacao, + null + ) as processo_defesa_autuacao, + if( + recurso_penalidade_multa != "00000000" + and recurso_penalidade_multa != "", + recurso_penalidade_multa, + null + ) as recurso_penalidade_multa, + if( + processo_troca_real_infrator != "00000000" + and processo_troca_real_infrator != "", + processo_troca_real_infrator, + null + ) as processo_troca_real_infrator, + false as status_sne, + "CITRAN" as fonte, + datetime("2023-08-26") as datetime_ultima_atualizacao + from {{ ref("autuacao_citran") }} + {% if is_incremental() %} + where + data between date("{{var('date_range_start')}}") and date( + "{{var('date_range_end')}}" + ) + {% endif %} + ), + serpro as ( + select + data, + id_auto_infracao, + datetime_autuacao, + data_limite_defesa_previa, + data_limite_recurso, + descricao_situacao_autuacao, + if(status_infracao != "", status_infracao, null) as status_infracao, + concat(codigo_enquadramento, codigo_desdobramento) as codigo_enquadramento, + substr(pontuacao, 1, 1) as pontuacao, + gravidade, + amparo_legal, + initcap(tipo_veiculo) as tipo_veiculo, + if(descricao_veiculo != "", descricao_veiculo, null) as descricao_veiculo, + placa_veiculo, + ano_fabricacao_veiculo, + ano_modelo_veiculo, + cor_veiculo, + especie_veiculo, + uf_infrator, + uf_principal_condutor, + if(uf_proprietario != "", uf_proprietario, null) as uf_proprietario, + safe_cast(null as string) as cep_proprietario, + valor_infracao, + valor_pago, + data_pagamento, + coalesce(id_autuador, "260010") as id_autuador, + if( + descricao_autuador != "", descricao_autuador, null + ) as descricao_autuador, + coalesce(id_municipio_autuacao, "6001") as id_municipio_autuacao, + coalesce(descricao_municipio, "RIO DE JANEIRO") as descricao_municipio, + coalesce(uf_autuacao, "RJ") as uf_autuacao, + case + when logradouro_autuacao is not null + then + rtrim( + regexp_replace( + concat( + logradouro_autuacao, + ' ', + bairro_autuacao, + ' ', + complemento + ), + r'\s+', + ' ' + ) + ) + when logradouro_autuacao_2 is not null + then logradouro_autuacao_2 + else null + end as cep_autuacao, + null as tile_autuacao, + processo_defesa_autuacao, + recurso_penalidade_multa, + processo_troca_real_infrator, + if(status_sne = "1.0", true, false) as status_sne, + "SERPRO" as fonte, + current_datetime("America/Sao_Paulo") as datetime_ultima_atualizacao + from {{ ref("autuacao_serpro") }} + {% if is_incremental() %} + where + data between date("{{var('date_range_start')}}") and date( + "{{var('date_range_end')}}" + ) + {% endif %} + ), + autuacao as ( + select + data, + to_hex(sha256(concat(generate_uuid(), id_auto_infracao))) as id_autuacao, -- trocar generate_uuid ? + id_auto_infracao, + datetime_autuacao, + data_limite_defesa_previa, + data_limite_recurso, + descricao_situacao_autuacao, + status_infracao, + codigo_enquadramento, + pontuacao, + gravidade, + amparo_legal, + tipo_veiculo, + descricao_veiculo, + placa_veiculo, + ano_fabricacao_veiculo, + ano_modelo_veiculo, + cor_veiculo, + especie_veiculo, + uf_infrator, + uf_principal_condutor, + uf_proprietario, + cep_proprietario, + valor_infracao, + valor_pago, + data_pagamento, + id_autuador, + descricao_autuador, + id_municipio_autuacao, + descricao_municipio, + uf_autuacao, + cep_autuacao, + tile_autuacao, + processo_defesa_autuacao, + recurso_penalidade_multa, + processo_troca_real_infrator, + status_sne, + fonte, + datetime_ultima_atualizacao + from citran + union all + select + data, + to_hex(sha256(concat(generate_uuid(), id_auto_infracao))) as id_autuacao, -- trocar generate_uuid ? + id_auto_infracao, + datetime_autuacao, + data_limite_defesa_previa, + data_limite_recurso, + descricao_situacao_autuacao, + status_infracao, + codigo_enquadramento, + pontuacao, + gravidade, + amparo_legal, + tipo_veiculo, + descricao_veiculo, + placa_veiculo, + ano_fabricacao_veiculo, + ano_modelo_veiculo, + cor_veiculo, + especie_veiculo, + uf_infrator, + uf_principal_condutor, + uf_proprietario, + cep_proprietario, + valor_infracao, + valor_pago, + data_pagamento, + id_autuador, + descricao_autuador, + id_municipio_autuacao, + descricao_municipio, + uf_autuacao, + cep_autuacao, + tile_autuacao, + processo_defesa_autuacao, + recurso_penalidade_multa, + processo_troca_real_infrator, + status_sne, + fonte, + datetime_ultima_atualizacao + from serpro + ), + update_partition as ( + select a.* + from autuacao a + {% if is_incremental() %} + left join {{ this }} as t using (id_auto_infracao) + where + a.status_infracao != t.status_infracao + or a.data_pagamento != t.data_pagamento + {% endif %} + ) +select data, - TO_HEX(SHA256(CONCAT(GENERATE_UUID(), id_auto_infracao))) AS id_autuacao, + id_autuacao, id_auto_infracao, datetime_autuacao, data_limite_defesa_previa, @@ -183,7 +283,7 @@ SELECT descricao_situacao_autuacao, status_infracao, codigo_enquadramento, - tipificacao_resumida, + i.tipificacao_resumida, pontuacao, gravidade, amparo_legal, @@ -212,6 +312,7 @@ SELECT recurso_penalidade_multa, processo_troca_real_infrator, status_sne, - fonte -FROM - serpro \ No newline at end of file + fonte, + datetime_ultima_atualizacao +from update_partition +left join infracoes_renainf as i using (codigo_enquadramento) diff --git a/queries/models/transito/staging/autuacao_citran.sql b/queries/models/transito/staging/autuacao_citran.sql index 5e56ba37b..e7519d51b 100644 --- a/queries/models/transito/staging/autuacao_citran.sql +++ b/queries/models/transito/staging/autuacao_citran.sql @@ -1,35 +1,43 @@ +{{ config(materialized="view") }} -{{ config( - materialized='view' - ) -}} +select + date(data) as data, + safe_cast(json_value(content, '$.Hora') as string) hora, + -- fmt: off + Cod__Detran as id_auto_infracao, + -- fmt: on + if( + json_value(content, '$.DtLimDP') != '', + safe_cast(parse_date('%d/%m/%Y', json_value(content, '$.DtLimDP')) as string), + null + ) data_limite_defesa_previa, + if( + json_value(content, '$.DtLimR') != '', + safe_cast(parse_date('%d/%m/%Y', json_value(content, '$.DtLimR')) as string), + null + ) data_limite_recurso, + safe_cast(json_value(content, '$.Situacao Atual') as string) situacao_atual, + safe_cast(json_value(content, '$."St. Infracao"') as string) status_infracao, + safe_cast(json_value(content, '$.Multa') as string) codigo_enquadramento, + safe_cast(json_value(content, '$.DsInf') as string) tipificacao_resumida, + safe_cast(json_value(content, '$.Po') as string) pontuacao, + safe_cast(json_value(content, '$.Tipo') as string) tipo_veiculo, + safe_cast(json_value(content, '$.Marca') as string) descricao_veiculo, + safe_cast(json_value(content, '$.Esp') as string) especie_veiculo, + safe_cast(json_value(content, '$.CDUF') as string) uf_proprietario, + safe_cast(json_value(content, '$.Cep') as string) cep_proprietario, + safe_cast(json_value(content, '$.Ufir') as numeric) valor_infracao, + safe_cast(json_value(content, '$.VlPagto') as numeric) valor_pago, + if( + json_value(content, '$.DtPagto') != '', + parse_date('%d/%m/%Y', json_value(content, '$.DtPagto')), + null + ) data_pagamento, + safe_cast(json_value(content, '$.Orgao') as string) descricao_autuador, + safe_cast(json_value(content, '$.LocInf') as string) endereco_autuacao, + safe_cast(json_value(content, '$.ProAutu') as string) processo_defesa_autuacao, + safe_cast(json_value(content, '$.NotifPen') as string) recurso_penalidade_multa, + safe_cast(json_value(content, '$.ProcRI') as string) processo_troca_real_infrator, -SELECT - DATE(data) AS data, - SAFE_CAST(JSON_VALUE(content,'$.Hora') AS STRING) hora, - Cod__Detran as id_auto_infracao, - IF(JSON_VALUE(content, '$.DtLimDP') != '', SAFE_CAST(PARSE_DATE('%d/%m/%Y', JSON_VALUE(content,'$.DtLimDP')) AS STRING), NULL) data_limite_defesa_previa, - IF(JSON_VALUE(content, '$.DtLimR') != '', SAFE_CAST(PARSE_DATE('%d/%m/%Y', JSON_VALUE(content,'$.DtLimR')) AS STRING), NULL) data_limite_recurso, - SAFE_CAST(JSON_VALUE(content,'$.Situacao Atual') AS STRING) situacao_atual, - SAFE_CAST(JSON_VALUE(content,'$."St. Infracao"') AS STRING) status_infracao, - SAFE_CAST(JSON_VALUE(content,'$.Multa') AS STRING) codigo_enquadramento, - SAFE_CAST(JSON_VALUE(content,'$.DsInf') AS STRING) tipificacao_resumida, - SAFE_CAST(JSON_VALUE(content,'$.Po') AS STRING) pontuacao, - SAFE_CAST(JSON_VALUE(content,'$.Tipo') AS STRING) tipo_veiculo, - SAFE_CAST(JSON_VALUE(content,'$.Marca') AS STRING) descricao_veiculo, - SAFE_CAST(JSON_VALUE(content,'$.Esp') AS STRING) especie_veiculo, - SAFE_CAST(JSON_VALUE(content,'$.CDUF') AS STRING) uf_proprietario, - SAFE_CAST(JSON_VALUE(content,'$.Cep') AS STRING) cep_proprietario, - SAFE_CAST(JSON_VALUE(content,'$.Ufir') AS NUMERIC) valor_infracao, - SAFE_CAST(JSON_VALUE(content,'$.VlPagto') AS NUMERIC) valor_pago, - IF(JSON_VALUE(content, '$.DtPagto') != '', SAFE_CAST(PARSE_DATE('%d/%m/%Y', JSON_VALUE(content,'$.DtPagto')) AS STRING), NULL) data_pagamento, - SAFE_CAST(JSON_VALUE(content,'$.Orgao') AS STRING) descricao_autuador, - SAFE_CAST(JSON_VALUE(content,'$.LocInf') AS STRING) endereco_autuacao, - SAFE_CAST(JSON_VALUE(content,'$.ProAutu') AS STRING) processo_defesa_autuacao, - SAFE_CAST(JSON_VALUE(content,'$.NotifPen') AS STRING) recurso_penalidade_multa, - SAFE_CAST(JSON_VALUE(content,'$.ProcRI') AS STRING) processo_troca_real_infrator, - -FROM - {{ source('infracao_staging','autuacoes_citran') }} as t - +from {{ source("infracao_staging", "autuacoes_citran") }} as t diff --git a/queries/models/transito/staging/autuacao_serpro.sql b/queries/models/transito/staging/autuacao_serpro.sql index 6149624c6..8751d0286 100644 --- a/queries/models/transito/staging/autuacao_serpro.sql +++ b/queries/models/transito/staging/autuacao_serpro.sql @@ -113,6 +113,7 @@ select safe_cast( json_value(content, '$.auinf_infrator_condutor_habilitado_numero_doc') as string ) as cnh_infrator, + safe_cast(json_value(content, '$.uf_infrator') as string) as uf_infrator, safe_cast( json_value(content, '$.auinf_infracao_valor') as numeric ) as valor_infracao, From 78213a9cfb29310316fc6dc87a32fc511874b84b Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Tue, 7 Jan 2025 17:30:25 -0300 Subject: [PATCH 43/54] altera captura --- pipelines/serpro/constants.py | 5 +-- pipelines/serpro/flows.py | 61 ++++++++++++++++------------------- pipelines/serpro/tasks.py | 22 ++++++++++--- 3 files changed, 49 insertions(+), 39 deletions(-) diff --git a/pipelines/serpro/constants.py b/pipelines/serpro/constants.py index 403ff20f2..54921ab65 100644 --- a/pipelines/serpro/constants.py +++ b/pipelines/serpro/constants.py @@ -16,7 +16,7 @@ class constants(Enum): # pylint: disable=c0103 AUTUACAO_MATERIALIZACAO_DATASET_ID = "transito" AUTUACAO_MATERIALIZACAO_TABLE_ID = "autuacao" - INFRACAO_PRIVATE_BUCKET = "rj-smtr-infracao-private" + INFRACAO_PRIVATE_BUCKET = "rj-smtr-dev-infracao-private" SERPRO_CAPTURE_PARAMS = { "query": """ @@ -25,7 +25,8 @@ class constants(Enum): # pylint: disable=c0103 FROM dbpro_radar_view_SMTR_VBL.tb_infracao_view WHERE - PARSEDATE(SUBSTRING(auinf_dt_infracao, 1, 10), 'yyyy-MM-dd') = '{date}' + PARSEDATE(SUBSTRING(auinf_dt_infracao, 1, 10), 'yyyy-MM-dd') BETWEEN '{start_date}' + AND '{end_date}' """, "primary_key": ["auinf_num_auto"], "pre_treatment_reader_args": {"dtype": "object"}, diff --git a/pipelines/serpro/flows.py b/pipelines/serpro/flows.py index 6a91a77cb..32bb0b328 100644 --- a/pipelines/serpro/flows.py +++ b/pipelines/serpro/flows.py @@ -1,8 +1,7 @@ # -*- coding: utf-8 -*- -from prefect import Parameter, case +from prefect import Parameter from prefect.run_configs import KubernetesRun from prefect.storage import GCS -from prefect.tasks.control_flow import merge from prefect.utilities.edges import unmapped from prefeitura_rio.pipelines_utils.custom import Flow from prefeitura_rio.pipelines_utils.state_handlers import ( @@ -16,6 +15,7 @@ create_local_partition_path, get_current_timestamp, get_now_time, + get_previous_date, parse_timestamp_to_string, rename_current_flow_run_now_time, run_dbt_model, @@ -27,73 +27,68 @@ from pipelines.serpro.constants import constants from pipelines.serpro.tasks import get_db_object, get_raw_serpro from pipelines.serpro.utils import handler_setup_serpro -from pipelines.tasks import get_timestamp_range with Flow("SMTR: SERPRO - Captura/Tratamento") as serpro_captura: - start_date = Parameter("start_date", default=None) - end_date = Parameter("end_date", default=None) + start_date = Parameter("start_date", default=get_previous_date.run(1)) + end_date = Parameter("end_date", default=get_previous_date.run(1)) rename_flow_run = rename_current_flow_run_now_time( prefix=serpro_captura.name + " ", now_time=get_now_time(), ) - capture_timestamps = get_timestamp_range(start_date, end_date) - with case(start_date, None): - current_timestamp = get_current_timestamp() + timestamp = get_current_timestamp() - timestamps = merge(current_timestamp, capture_timestamps) - - partitions = create_date_hour_partition.map( - timestamps, + partitions = create_date_hour_partition( + timestamp, partition_date_only=unmapped(True), ) - filenames = parse_timestamp_to_string.map(timestamps) + filenames = parse_timestamp_to_string(timestamp) - local_filepaths = create_local_partition_path.map( - dataset_id=unmapped(constants.INFRACAO_DATASET_ID.value), - table_id=unmapped(constants.AUTUACAO_SERPRO_TABLE_ID.value), + local_filepaths = create_local_partition_path( + dataset_id=constants.INFRACAO_DATASET_ID.value, + table_id=constants.AUTUACAO_SERPRO_TABLE_ID.value, partitions=partitions, filename=filenames, ) jdbc = get_db_object() - raw_filepaths = get_raw_serpro.map( - jdbc=unmapped(jdbc), timestamp=timestamps, local_filepath=local_filepaths + raw_filepaths = get_raw_serpro( + jdbc=jdbc, start_date=start_date, end_date=end_date, local_filepath=local_filepaths ) - transform_raw_to_nested_structure_results = transform_raw_to_nested_structure.map( + transform_raw_to_nested_structure_results = transform_raw_to_nested_structure( raw_filepath=raw_filepaths, filepath=local_filepaths, - primary_key=unmapped(constants.SERPRO_CAPTURE_PARAMS.value["primary_key"]), - timestamp=timestamps, - reader_args=unmapped(constants.SERPRO_CAPTURE_PARAMS.value["pre_treatment_reader_args"]), - error=unmapped(None), + primary_key=constants.SERPRO_CAPTURE_PARAMS.value["primary_key"], + timestamp=timestamp, + reader_args=constants.SERPRO_CAPTURE_PARAMS.value["pre_treatment_reader_args"], + error=None, ) errors, treated_filepaths = unpack_mapped_results_nout2( mapped_results=transform_raw_to_nested_structure_results ) - errors = upload_raw_data_to_gcs.map( - dataset_id=unmapped(constants.INFRACAO_DATASET_ID.value), - table_id=unmapped(constants.AUTUACAO_SERPRO_TABLE_ID.value), + errors = upload_raw_data_to_gcs( + dataset_id=constants.INFRACAO_DATASET_ID.value, + table_id=constants.AUTUACAO_SERPRO_TABLE_ID.value, raw_filepath=raw_filepaths, partitions=partitions, - error=unmapped(None), - # bucket_name=unmapped(constants.INFRACAO_PRIVATE_BUCKET.value), + error=None, + bucket_name=constants.INFRACAO_PRIVATE_BUCKET.value, ) - wait_captura_true = upload_staging_data_to_gcs.map( - dataset_id=unmapped(constants.INFRACAO_DATASET_ID.value), - table_id=unmapped(constants.AUTUACAO_SERPRO_TABLE_ID.value), + wait_captura_true = upload_staging_data_to_gcs( + dataset_id=constants.INFRACAO_DATASET_ID.value, + table_id=constants.AUTUACAO_SERPRO_TABLE_ID.value, staging_filepath=treated_filepaths, partitions=partitions, - timestamp=timestamps, + timestamp=timestamp, error=errors, - # bucket_name=unmapped(constants.INFRACAO_PRIVATE_BUCKET.value), + bucket_name=constants.INFRACAO_PRIVATE_BUCKET.value, ) wait_run_dbt_model = run_dbt_model( diff --git a/pipelines/serpro/tasks.py b/pipelines/serpro/tasks.py index 7f97260d5..d8fc82b6f 100644 --- a/pipelines/serpro/tasks.py +++ b/pipelines/serpro/tasks.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- import csv -from datetime import datetime from pathlib import Path from time import sleep @@ -23,13 +22,28 @@ def get_db_object(secret_path="radar_serpro", environment: str = "dev"): @task(checkpoint=False, nout=2) def get_raw_serpro( - jdbc: JDBC, timestamp: datetime, local_filepath: str, batch_size: int = 100000 + jdbc: JDBC, start_date: str, end_date: str, local_filepath: str, batch_size: int = 100000 ) -> str: - date = timestamp.date() + """ + Task para capturar dados brutos do SERPRO com base em um intervalo de datas. + + Args: + jdbc (JDBC): Instância para execução de queries via JDBC. + start_date (str): Data de início no formato "YYYY-MM-DD". + end_date (str): Data de fim no formato "YYYY-MM-DD". + local_filepath (str): Local onde o arquivo será salvo. + batch_size (int): Tamanho do lote. + + Returns: + str: Caminho do arquivo salvo. + """ + raw_filepath = local_filepath.format(mode="raw", filetype="csv") Path(raw_filepath).parent.mkdir(parents=True, exist_ok=True) - query = constants.SERPRO_CAPTURE_PARAMS.value["query"].format(date=date.strftime("%Y-%m-%d")) + query = constants.SERPRO_CAPTURE_PARAMS.value["query"].format( + start_date=start_date, end_date=end_date + ) jdbc.execute_query(query) columns = jdbc.get_columns() From 7b6b533a1cc94c88f2e43130fe2cfe291aa1c8ae Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Tue, 7 Jan 2025 17:56:49 -0300 Subject: [PATCH 44/54] altera where --- pipelines/serpro/constants.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pipelines/serpro/constants.py b/pipelines/serpro/constants.py index 54921ab65..29945a073 100644 --- a/pipelines/serpro/constants.py +++ b/pipelines/serpro/constants.py @@ -25,8 +25,9 @@ class constants(Enum): # pylint: disable=c0103 FROM dbpro_radar_view_SMTR_VBL.tb_infracao_view WHERE - PARSEDATE(SUBSTRING(auinf_dt_infracao, 1, 10), 'yyyy-MM-dd') BETWEEN '{start_date}' - AND '{end_date}' + PARSEDATE(SUBSTRING(auinf_dt_infracao, 1, 10), 'yyyy-MM-dd') + BETWEEN PARSEDATE('{start_date}', 'yyyy-MM-dd') + AND PARSEDATE('{end_date}', 'yyyy-MM-dd') """, "primary_key": ["auinf_num_auto"], "pre_treatment_reader_args": {"dtype": "object"}, From 3956045300c77a887c4d823148a0a74aadfb20ec Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Tue, 7 Jan 2025 20:14:58 -0300 Subject: [PATCH 45/54] corrige flow --- pipelines/serpro/flows.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pipelines/serpro/flows.py b/pipelines/serpro/flows.py index 32bb0b328..ca7c261a6 100644 --- a/pipelines/serpro/flows.py +++ b/pipelines/serpro/flows.py @@ -20,7 +20,6 @@ rename_current_flow_run_now_time, run_dbt_model, transform_raw_to_nested_structure, - unpack_mapped_results_nout2, upload_raw_data_to_gcs, upload_staging_data_to_gcs, ) @@ -59,7 +58,7 @@ jdbc=jdbc, start_date=start_date, end_date=end_date, local_filepath=local_filepaths ) - transform_raw_to_nested_structure_results = transform_raw_to_nested_structure( + errors, treated_filepaths = transform_raw_to_nested_structure( raw_filepath=raw_filepaths, filepath=local_filepaths, primary_key=constants.SERPRO_CAPTURE_PARAMS.value["primary_key"], @@ -68,10 +67,6 @@ error=None, ) - errors, treated_filepaths = unpack_mapped_results_nout2( - mapped_results=transform_raw_to_nested_structure_results - ) - errors = upload_raw_data_to_gcs( dataset_id=constants.INFRACAO_DATASET_ID.value, table_id=constants.AUTUACAO_SERPRO_TABLE_ID.value, From ccbfd190d407b4b76121cd15a604e5a0dfefd7b8 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Tue, 7 Jan 2025 20:15:12 -0300 Subject: [PATCH 46/54] altera batch_size --- pipelines/serpro/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/serpro/tasks.py b/pipelines/serpro/tasks.py index d8fc82b6f..e8e309e5d 100644 --- a/pipelines/serpro/tasks.py +++ b/pipelines/serpro/tasks.py @@ -22,7 +22,7 @@ def get_db_object(secret_path="radar_serpro", environment: str = "dev"): @task(checkpoint=False, nout=2) def get_raw_serpro( - jdbc: JDBC, start_date: str, end_date: str, local_filepath: str, batch_size: int = 100000 + jdbc: JDBC, start_date: str, end_date: str, local_filepath: str, batch_size: int = 50000 ) -> str: """ Task para capturar dados brutos do SERPRO com base em um intervalo de datas. From 0933de2a1624547832dbf52a31d141d7fa622369 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Tue, 7 Jan 2025 21:06:06 -0300 Subject: [PATCH 47/54] corrige get_raw_serpro --- pipelines/serpro/tasks.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pipelines/serpro/tasks.py b/pipelines/serpro/tasks.py index e8e309e5d..e1bde4968 100644 --- a/pipelines/serpro/tasks.py +++ b/pipelines/serpro/tasks.py @@ -22,7 +22,7 @@ def get_db_object(secret_path="radar_serpro", environment: str = "dev"): @task(checkpoint=False, nout=2) def get_raw_serpro( - jdbc: JDBC, start_date: str, end_date: str, local_filepath: str, batch_size: int = 50000 + jdbc: JDBC, start_date: str, end_date: str, local_filepath: str, batch_size: int = 100000 ) -> str: """ Task para capturar dados brutos do SERPRO com base em um intervalo de datas. @@ -48,12 +48,14 @@ def get_raw_serpro( jdbc.execute_query(query) columns = jdbc.get_columns() - rows = jdbc.fetch_batch(batch_size=batch_size) - with open(raw_filepath, "w", newline="") as csvfile: writer = csv.writer(csvfile) writer.writerow(columns) - writer.writerows(rows) + while True: + rows = jdbc.fetch_batch(batch_size=batch_size) + if not rows: + break + writer.writerows(rows) log(f"Raw data saved to: {raw_filepath}") return raw_filepath From 76d2fabad41dea9b5dbd5dd00ef31c54bf5388b8 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Wed, 8 Jan 2025 10:48:45 -0300 Subject: [PATCH 48/54] teste transform_raw_to_nested_structure_chunked --- pipelines/serpro/flows.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pipelines/serpro/flows.py b/pipelines/serpro/flows.py index ca7c261a6..d809120c1 100644 --- a/pipelines/serpro/flows.py +++ b/pipelines/serpro/flows.py @@ -19,7 +19,7 @@ parse_timestamp_to_string, rename_current_flow_run_now_time, run_dbt_model, - transform_raw_to_nested_structure, + transform_raw_to_nested_structure_chunked, upload_raw_data_to_gcs, upload_staging_data_to_gcs, ) @@ -58,13 +58,14 @@ jdbc=jdbc, start_date=start_date, end_date=end_date, local_filepath=local_filepaths ) - errors, treated_filepaths = transform_raw_to_nested_structure( + errors, treated_filepaths = transform_raw_to_nested_structure_chunked( raw_filepath=raw_filepaths, filepath=local_filepaths, primary_key=constants.SERPRO_CAPTURE_PARAMS.value["primary_key"], timestamp=timestamp, reader_args=constants.SERPRO_CAPTURE_PARAMS.value["pre_treatment_reader_args"], error=None, + chunksize=50000, ) errors = upload_raw_data_to_gcs( From 2365f0f837afcc0255a6fc39a89f4683ed69d99b Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 9 Jan 2025 16:04:47 -0300 Subject: [PATCH 49/54] =?UTF-8?q?atualiza=C3=A7=C3=A3o=20dos=20modelos=20e?= =?UTF-8?q?=20schema?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- queries/models/transito/autuacao.sql | 29 ++++-- queries/models/transito/schema.yml | 99 +++++++++++++++++-- .../transito/staging/autuacao_serpro.sql | 14 ++- .../transito/staging/aux_autuacao_id.sql | 30 ++++++ 4 files changed, 152 insertions(+), 20 deletions(-) create mode 100644 queries/models/transito/staging/aux_autuacao_id.sql diff --git a/queries/models/transito/autuacao.sql b/queries/models/transito/autuacao.sql index af67ab94b..f3567f2c2 100644 --- a/queries/models/transito/autuacao.sql +++ b/queries/models/transito/autuacao.sql @@ -14,8 +14,18 @@ with descricao_infracao as tipificacao_resumida from {{ source("infracao_staging", "infracoes_renainf") }} ), + autuacao_ids as ( + select data, id_autuacao, id_auto_infracao + from {{ ref("aux_autuacao_id") }} + {% if is_incremental() %} + where + data between date("{{var('date_range_start')}}") and date( + "{{var('date_range_end')}}" + ) + {% endif %} + ), citran as ( - select + select distinct data, id_auto_infracao, datetime(concat(data, ' ', hora, ':00')) as datetime_autuacao, @@ -110,7 +120,7 @@ with {% endif %} ), serpro as ( - select + select distinct data, id_auto_infracao, datetime_autuacao, @@ -159,8 +169,8 @@ with ' ' ) ) - when logradouro_autuacao_2 is not null - then logradouro_autuacao_2 + when logradouro_rodovia_autuacao is not null + then logradouro_rodovia_autuacao else null end as cep_autuacao, null as tile_autuacao, @@ -181,7 +191,6 @@ with autuacao as ( select data, - to_hex(sha256(concat(generate_uuid(), id_auto_infracao))) as id_autuacao, -- trocar generate_uuid ? id_auto_infracao, datetime_autuacao, data_limite_defesa_previa, @@ -223,7 +232,6 @@ with union all select data, - to_hex(sha256(concat(generate_uuid(), id_auto_infracao))) as id_autuacao, -- trocar generate_uuid ? id_auto_infracao, datetime_autuacao, data_limite_defesa_previa, @@ -274,9 +282,9 @@ with {% endif %} ) select - data, - id_autuacao, - id_auto_infracao, + u.data, + a.id_autuacao, + u.id_auto_infracao, datetime_autuacao, data_limite_defesa_previa, data_limite_recurso, @@ -314,5 +322,6 @@ select status_sne, fonte, datetime_ultima_atualizacao -from update_partition +from update_partition as u +left join autuacao_ids as a using (id_auto_infracao) left join infracoes_renainf as i using (codigo_enquadramento) diff --git a/queries/models/transito/schema.yml b/queries/models/transito/schema.yml index 0e42cea86..39babf613 100644 --- a/queries/models/transito/schema.yml +++ b/queries/models/transito/schema.yml @@ -33,7 +33,7 @@ models: - name: uf_proprietario description: "Estado do proprietário do veículo" - name: cep_proprietario - description: "CEP do proprietário do veículo [protegido]" + description: "CEP do proprietário do veículo" - name: valor_infracao description: "Valor monetário da autuação (100%) [R$]" - name: valor_pago @@ -66,6 +66,8 @@ models: description: "Data da autuação (Coluna de particionamento)" - name: id_auto_infracao description: "Código do auto de infração" + - name: origem_auto_infracao + description: "Origem do AIT" - name: datetime_autuacao description: "Data e hora da autuação" - name: data_limite_defesa_previa @@ -78,6 +80,8 @@ models: description: "Descrição do status da infração" - name: codigo_enquadramento description: "Código da autuação" + - name: codigo_desdobramento + description: "Desdobramento do tipo de infração que junto com o código identifica exclusivamente o tipo de infração" - name: tipificacao_resumida description: "Descrição da autuação" - name: pontuacao @@ -86,12 +90,20 @@ models: description: "Descrição da gravidade da autuação" - name: amparo_legal description: "Amparo legal da autuação" + - name: velocidade_aferida + description: "Valor aferido para aplicação da infração" + - name: velocidade_considerada + description: "Valor considerado para aplicação da infração" + - name: velocidade_regulamentada + description: "Limite regulamento para aplicação da infração" - name: tipo_veiculo description: "Tipo de veículo autuado" - name: descricao_veiculo description: "Marca/modelo do veículo" - name: placa_veiculo description: "Placa do veículo" + - name: chassi_veiculo + description: "Chassi do veículo" - name: ano_fabricacao_veiculo description: "Ano de fabricação do veículo" - name: ano_modelo_veiculo @@ -100,30 +112,84 @@ models: description: "Cor do veículo" - name: especie_veiculo description: "Espécie do veículo" - - name: uf_infrator - description: "Estado do condutor infrator (em caso de indicação de real condutor infrator)" - - name: uf_principal_condutor - description: "Estado do condutor principal do veículo" + - name: uf_veiculo + description: "Sigla do estado do veículo" + - name: nome_proprietario + description: "Nome do proprietário" + - name: cpf_proprietario + description: "CPF do proprietario do veículo" + - name: cnh_proprietario + description: "Número da CNH do proprietario do veículo" + - name: uf_cnh_proprietario + description: "Sigla do estado da CNH do proprietário" - name: uf_proprietario description: "Estado do proprietário do veículo" - - name: cep_proprietario - description: "CEP do proprietário do veículo [protegido]" + - name: numero_condutor_nao_habilitado + description: "Número de identificação do documento do condutor não habilitado" + - name: nome_condutor + description: "Nome do condutor" + - name: numero_identificacao_condutor + description: "Número do documento de identificação do condutor do veículo" + - name: cnh_condutor + description: "Número da CNH do condutor do veículo" + - name: uf_principal_condutor + description: "Estado do condutor principal do veículo" + - name: nome_infrator + description: "Nome do infrator" + - name: cpf_infrator + description: "CPF do infrator" + - name: cnh_infrator + description: "CNH do infrator" + - name: uf_infrator + description: "Estado do condutor infrator (em caso de indicação de real condutor infrator)" - name: valor_infracao description: "Valor monetário da autuação (100%) [R$]" - name: valor_pago description: "Valor pago da autuação [R$]" + - name: data_pagamento + description: "Data do pagamento" + - name: codigo_banco + description: "Código do banco" + - name: nome_banco + description: "Nome do banco" + - name: status_pagamento + description: "Status do pagamento. (pago, pago a mais, pago a menos, pago em duplicidade, nao identificado, desvinculado, em aberto)" + - name: codigo_auto_infracao_renainf + description: "Código do auto de infração no sistema renainf" - name: id_autuador description: "Código do órgão autuador" - name: descricao_autuador description: "Descrição da unidade de autuação" + - name: matricula_autuador + description: "Matrícula do usuário no órgão cadastrado" - name: id_municipio_autuacao description: "Código TOM do município da autuação" - name: descricao_municipio description: "Nome do município da autuação" - name: uf_autuacao description: "Sigla do estado da autuação" + - name: bairro_autuacao + description: "Endereço da autuação - Bairro" + - name: cep_autuacao + description: "Endereço da autuação - CEP" + - name: logradouro_autuacao + description: "Endereço da autuação - Logradouro" + - name: ende_numero_autuacao + description: "Endereço da autuação - Número" + - name: complemento + description: "Endereço da autuação - Complemento" + - name: logradouro_rodovia_autuacao + description: "Endereço da autuação - Logradouro" + - name: observacao_autuacao + description: "Observação geral do auto" - name: processo_defesa_autuacao description: "Número do processo de defesa prévia" + - name: rrso_num_processo + description: "Número de processo" + - name: cpa_num_processo + description: "Número de processo" + - name: susp_num_processo + description: "Número de processo" - name: recurso_penalidade_multa description: "Número do processo de recurso contra aplicação de penalidade de multa em primeira instância" - name: processo_troca_real_infrator @@ -131,6 +197,20 @@ models: - name: status_sne description: "Indicador de adesão do veículo ao SNE" + - name: aux_autuacao_id + description: "Tabela auxiliar para gerar identificador único da autuação" + columns: + - name: data + description: "Data da autuação (Coluna de particionamento)" + - name: id_autuacao + description: "Identificador único da autuação" + - name: id_auto_infracao + description: "Código do auto de infração [protegido]" + policy_tags: + - 'projects/rj-smtr/locations/us/taxonomies/7968932463054912793/policyTags/5243840798726507169' + - name: datetime_ultima_atualizacao + description: "{{ doc('datetime_ultima_atualizacao') }}" + - name: autuacao description: "Tabela com dados de autuações (até abril/2023)" columns: @@ -167,7 +247,7 @@ models: - name: descricao_veiculo description: "Marca/modelo do veículo" - name: placa_veiculo - description: "Placa do veículo" + description: "Placa do veículo [protegido]" policy_tags: - 'projects/rj-smtr/locations/us/taxonomies/7968932463054912793/policyTags/5243840798726507169' - name: ano_fabricacao_veiculo @@ -220,6 +300,9 @@ models: description: "Indicador de adesão do veículo ao Sistema de Notificação Eletrônica (SNE)" - name: fonte description: "Origem dos dados" + - name: datetime_ultima_atualizacao + description: "{{ doc('datetime_ultima_atualizacao') }}" + - name: receita_autuacao columns: - name: data diff --git a/queries/models/transito/staging/autuacao_serpro.sql b/queries/models/transito/staging/autuacao_serpro.sql index 8751d0286..31b116ecf 100644 --- a/queries/models/transito/staging/autuacao_serpro.sql +++ b/queries/models/transito/staging/autuacao_serpro.sql @@ -2,7 +2,17 @@ select - date(data) as data, + date( + parse_timestamp( + '%Y-%m-%d %H:%M:%S', + regexp_replace( + safe_cast(json_value(content, '$.auinf_dt_infracao') as string), + r'\.\d+', + '' + ) + ), + 'America/Sao_Paulo' + ) as data, auinf_num_auto as id_auto_infracao, safe_cast( json_value(content, '$.auinf_origem_desc') as string @@ -164,7 +174,7 @@ select safe_cast(json_value(content, '$.complemento') as string) as complemento, safe_cast( json_value(content, '$.auinf_local_rodovia') as string - ) as logradouro_autuacao_2, -- Deveria ser somente rodovias + ) as logradouro_rodovia_autuacao, safe_cast( json_value(content, '$.auinf_observacao') as string ) as observacao_autuacao, diff --git a/queries/models/transito/staging/aux_autuacao_id.sql b/queries/models/transito/staging/aux_autuacao_id.sql new file mode 100644 index 000000000..9924568e6 --- /dev/null +++ b/queries/models/transito/staging/aux_autuacao_id.sql @@ -0,0 +1,30 @@ +{{ + config( + materialized="incremental", + partition_by={"field": "data", "data_type": "date", "granularity": "day"}, + unique_key="id_auto_infracao", + incremental_strategy="insert_overwrite", + ) +}} + +select + s.data, + to_hex(sha256(concat(generate_uuid(), id_auto_infracao))) as id_autuacao, + id_auto_infracao, + current_datetime("America/Sao_Paulo") as datetime_ultima_atualizacao +from + ( + select distinct id_auto_infracao, data + from {{ ref("autuacao_citran") }} + union all + select distinct id_auto_infracao, data + from {{ ref("autuacao_serpro") }} + ) s +{% if is_incremental() %} + left join {{ this }} as t using (data, id_auto_infracao) + where + t.id_auto_infracao is null + and s.data between date("{{var('date_range_start')}}") and date( + "{{var('date_range_end')}}" + ) +{% endif %} From 3c8bcd64bc07876c9630c177580c064347c6402f Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Fri, 10 Jan 2025 10:26:49 -0300 Subject: [PATCH 50/54] docstrings --- pipelines/serpro/tasks.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/pipelines/serpro/tasks.py b/pipelines/serpro/tasks.py index e1bde4968..b04558e0b 100644 --- a/pipelines/serpro/tasks.py +++ b/pipelines/serpro/tasks.py @@ -17,6 +17,17 @@ def wait_sleeping(interval_seconds: int = 54000): @task(checkpoint=False) def get_db_object(secret_path="radar_serpro", environment: str = "dev"): + """ + Creates a JDBC object. + + Args: + secret_path (str): The path to the secret containing database credentials. + Defaults to "radar_serpro". + environment (str): The environment for the connection. Defaults to "dev". + + Returns: + JDBC: A JDBC connection object. + """ return JDBC(db_params_secret_path=secret_path, environment=environment) @@ -25,17 +36,17 @@ def get_raw_serpro( jdbc: JDBC, start_date: str, end_date: str, local_filepath: str, batch_size: int = 100000 ) -> str: """ - Task para capturar dados brutos do SERPRO com base em um intervalo de datas. + Task to fetch raw data from SERPRO based on a date range. Args: - jdbc (JDBC): Instância para execução de queries via JDBC. - start_date (str): Data de início no formato "YYYY-MM-DD". - end_date (str): Data de fim no formato "YYYY-MM-DD". - local_filepath (str): Local onde o arquivo será salvo. - batch_size (int): Tamanho do lote. + jdbc (JDBC): Instance for executing queries via JDBC. + start_date (str): Start date in the format "YYYY-MM-DD". + end_date (str): End date in the format "YYYY-MM-DD". + local_filepath (str): Path where the file will be saved. + batch_size (int): Batch size. Returns: - str: Caminho do arquivo salvo. + str: Path of the saved file. """ raw_filepath = local_filepath.format(mode="raw", filetype="csv") From 8cde4f9e192547cac27473c4eea04ad3dd351ddb Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Fri, 10 Jan 2025 10:27:20 -0300 Subject: [PATCH 51/54] ajustes para prod --- pipelines/serpro/constants.py | 2 +- queries/profiles.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/serpro/constants.py b/pipelines/serpro/constants.py index 29945a073..0b3f14932 100644 --- a/pipelines/serpro/constants.py +++ b/pipelines/serpro/constants.py @@ -16,7 +16,7 @@ class constants(Enum): # pylint: disable=c0103 AUTUACAO_MATERIALIZACAO_DATASET_ID = "transito" AUTUACAO_MATERIALIZACAO_TABLE_ID = "autuacao" - INFRACAO_PRIVATE_BUCKET = "rj-smtr-dev-infracao-private" + INFRACAO_PRIVATE_BUCKET = "rj-smtr-infracao-private" SERPRO_CAPTURE_PARAMS = { "query": """ diff --git a/queries/profiles.yml b/queries/profiles.yml index 3ac185e4d..e67088d2d 100644 --- a/queries/profiles.yml +++ b/queries/profiles.yml @@ -78,4 +78,4 @@ queries: spark.executor.instances: "2" spark.driver.memory: 4g spark.driver.memoryOverhead: 1g - target: hmg \ No newline at end of file + target: prod \ No newline at end of file From a6f94bb3a55f50bda891e0c9f67ef3e8e8492648 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Fri, 10 Jan 2025 10:27:31 -0300 Subject: [PATCH 52/54] add changelogs --- pipelines/CHANGELOG.md | 6 ++++++ pipelines/serpro/CHANGELOG.md | 7 +++++++ queries/models/transito/CHANGELOG.md | 11 +++++++++++ 3 files changed, 24 insertions(+) create mode 100644 pipelines/serpro/CHANGELOG.md diff --git a/pipelines/CHANGELOG.md b/pipelines/CHANGELOG.md index 3cfbc4a67..b099b2426 100644 --- a/pipelines/CHANGELOG.md +++ b/pipelines/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog - pipelines +## [1.0.2] - 2025-01-10 + +### Adicionado + +- Adiciona task `get_timestamp_range` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/255) + ## [1.0.1] - 2024-12-13 ### Adicionado diff --git a/pipelines/serpro/CHANGELOG.md b/pipelines/serpro/CHANGELOG.md new file mode 100644 index 000000000..32150dccf --- /dev/null +++ b/pipelines/serpro/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog - serpro + +## [1.0.0] - 2025-01-10 + +### Adicionado + +- Cria flow de captura de autuações fornecidos pelo sistema SERPRO (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/255) \ No newline at end of file diff --git a/queries/models/transito/CHANGELOG.md b/queries/models/transito/CHANGELOG.md index d8c20b451..bf55f890d 100644 --- a/queries/models/transito/CHANGELOG.md +++ b/queries/models/transito/CHANGELOG.md @@ -1,5 +1,16 @@ # Changelog - infracao +## [1.0.3] - 2025-01-10 + +## Adicionado + +- Adicionada a view `autuacao_serpro` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/255) +- Adicionada a tabela `aux_autuacao_id` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/255) + +## Alterado + +- Alterado a tabela `autuacao` incluindo join da view `autuacao_serpro` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/255) + ## [1.0.2] - 2024-09-06 ## Adicionado From 9d87e3ba661915996d2e4fa49e2b78ca098cff49 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Fri, 10 Jan 2025 11:06:36 -0300 Subject: [PATCH 53/54] altera unique_key --- queries/models/transito/autuacao.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queries/models/transito/autuacao.sql b/queries/models/transito/autuacao.sql index f3567f2c2..e24c5c434 100644 --- a/queries/models/transito/autuacao.sql +++ b/queries/models/transito/autuacao.sql @@ -2,7 +2,7 @@ config( materialized="incremental", partition_by={"field": "data", "data_type": "date", "granularity": "day"}, - unique_key="id_autuacao", + unique_key=["id_autuacao", "status_infracao", "data_pagamento"], incremental_strategy="insert_overwrite", ) }} From 87180a6d61b2793fe89cbdbbdf9e4572dde79c54 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Tue, 21 Jan 2025 13:30:14 -0300 Subject: [PATCH 54/54] bucket dev para teste --- pipelines/serpro/constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/serpro/constants.py b/pipelines/serpro/constants.py index 0b3f14932..29945a073 100644 --- a/pipelines/serpro/constants.py +++ b/pipelines/serpro/constants.py @@ -16,7 +16,7 @@ class constants(Enum): # pylint: disable=c0103 AUTUACAO_MATERIALIZACAO_DATASET_ID = "transito" AUTUACAO_MATERIALIZACAO_TABLE_ID = "autuacao" - INFRACAO_PRIVATE_BUCKET = "rj-smtr-infracao-private" + INFRACAO_PRIVATE_BUCKET = "rj-smtr-dev-infracao-private" SERPRO_CAPTURE_PARAMS = { "query": """