Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Flow de captura SERPRO #255

Open
wants to merge 82 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 80 commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
786b1f4
cria modelo autuacao_serpro
akaBotelho Oct 2, 2024
0aeb6da
adiciona CTE serpro
akaBotelho Oct 2, 2024
d2a1ee7
cria task get_timestamp_range
akaBotelho Oct 2, 2024
594e314
cria flow serpro_captura
akaBotelho Oct 2, 2024
923613f
corrige tipo do timestamp
akaBotelho Oct 2, 2024
c5d34f5
altera query
akaBotelho Oct 2, 2024
32d33b5
altera where
akaBotelho Oct 2, 2024
3d03c01
add mkdir
akaBotelho Oct 2, 2024
2e52ca3
add bucket_name
akaBotelho Oct 2, 2024
4137132
Merge branch 'main' into staging/flow-serpro
mergify[bot] Oct 3, 2024
989e6d9
corrige where
akaBotelho Oct 3, 2024
127854c
Merge branch 'staging/flow-serpro' of https://github.com/prefeitura-r…
akaBotelho Oct 3, 2024
c486e63
add dbt deps
akaBotelho Oct 3, 2024
807f7bb
aumenta max_retries e retry_delay da get_db_object
akaBotelho Oct 3, 2024
88acb22
aumenta max_retries e retry_delay da get_db_object
akaBotelho Oct 3, 2024
76e6c20
aumenta retry_delay da get_db_object
akaBotelho Oct 3, 2024
3ced5d2
add path to setup_serpro
akaBotelho Oct 3, 2024
d244193
add workdir
akaBotelho Oct 3, 2024
ce0e9b9
Merge branch 'staging/flow-serpro' of https://github.com/prefeitura-r…
akaBotelho Oct 3, 2024
84685dd
volta setup_serpro
akaBotelho Oct 3, 2024
4317bd5
retorna max_retries e retry_delay da get_db_object
akaBotelho Oct 3, 2024
59acf79
import subprocess
akaBotelho Oct 3, 2024
0cec8e0
add pre_treatment_reader_args
akaBotelho Oct 3, 2024
fce39c3
comenta bucket_name
akaBotelho Oct 3, 2024
b4ce077
add exception
akaBotelho Oct 3, 2024
16c3bed
altera get_db_object
akaBotelho Oct 3, 2024
4b48791
teste dev
akaBotelho Oct 3, 2024
7a97bc7
hmg
akaBotelho Oct 3, 2024
e69b9a5
altera modelos
akaBotelho Oct 3, 2024
f1560f9
upstream
akaBotelho Oct 3, 2024
47c016f
corrige CTE serpro.cep_proprietario
akaBotelho Oct 3, 2024
9307a50
add SAFE_CAST
akaBotelho Oct 3, 2024
c78f7d0
Merge branch 'main' into staging/flow-serpro
mergify[bot] Oct 4, 2024
0189928
Merge branch 'main' into staging/flow-serpro
mergify[bot] Oct 4, 2024
1aa7059
padroniza dados
akaBotelho Oct 4, 2024
54de0b9
Merge branch 'main' into staging/flow-serpro
mergify[bot] Oct 6, 2024
7b7d76b
Merge branch 'main' into staging/flow-serpro
mergify[bot] Oct 7, 2024
7cc1ffb
Merge branch 'main' into staging/flow-serpro
mergify[bot] Oct 8, 2024
d4eeafa
Merge branch 'main' into staging/flow-serpro
mergify[bot] Oct 9, 2024
d71e4b6
Merge branch 'main' into staging/flow-serpro
mergify[bot] Oct 9, 2024
cd71ff9
Merge branch 'main' into staging/flow-serpro
mergify[bot] Oct 9, 2024
d354197
adiciona colunas serpro
akaBotelho Oct 9, 2024
8d988ab
Merge branch 'main' into staging/flow-serpro
mergify[bot] Oct 9, 2024
60cb5f5
Merge branch 'main' into staging/flow-serpro
mergify[bot] Oct 10, 2024
ecdc3e7
Merge branch 'main' into staging/flow-serpro
mergify[bot] Oct 16, 2024
4bdf1ab
Merge branch 'main' into staging/flow-serpro
mergify[bot] Oct 16, 2024
8ad0c7f
Merge branch 'main' into staging/flow-serpro
mergify[bot] Oct 16, 2024
197a490
Merge branch 'main' into staging/flow-serpro
mergify[bot] Oct 17, 2024
e946468
Merge branch 'main' into staging/flow-serpro
mergify[bot] Oct 21, 2024
0e6215f
Merge branch 'staging/flow-serpro' of https://github.com/prefeitura-r…
akaBotelho Dec 16, 2024
52e8052
add batch_size
akaBotelho Dec 16, 2024
0404450
Merge branch 'main' into staging/flow-serpro
akaBotelho Dec 16, 2024
213ae37
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 16, 2024
30ab64b
target dev
akaBotelho Dec 16, 2024
73df6bf
Merge branch 'main' into staging/flow-serpro
mergify[bot] Dec 16, 2024
0215648
Merge branch 'main' into staging/flow-serpro
mergify[bot] Dec 16, 2024
5686c0d
altera target para hmg
akaBotelho Jan 6, 2025
b96ad20
add source infracoes_renainf
akaBotelho Jan 6, 2025
672d170
adiciona novas colunas
akaBotelho Jan 6, 2025
b11da43
add join infracoes_renainf
akaBotelho Jan 6, 2025
047345d
Merge branch 'main' into staging/flow-serpro
akaBotelho Jan 6, 2025
7cc3368
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 6, 2025
d703c63
Merge branch 'main' into staging/flow-serpro
mergify[bot] Jan 6, 2025
bf47bc9
altera where para teste
akaBotelho Jan 7, 2025
10f12e8
Merge branch 'staging/flow-serpro' of https://github.com/prefeitura-r…
akaBotelho Jan 7, 2025
29d5b5c
Merge branch 'main' into staging/flow-serpro
mergify[bot] Jan 7, 2025
5851055
altera where para teste
akaBotelho Jan 7, 2025
088e372
Merge branch 'staging/flow-serpro' of https://github.com/prefeitura-r…
akaBotelho Jan 7, 2025
2414013
atualiza modelos
akaBotelho Jan 7, 2025
78213a9
altera captura
akaBotelho Jan 7, 2025
7b6b533
altera where
akaBotelho Jan 7, 2025
3956045
corrige flow
akaBotelho Jan 7, 2025
ccbfd19
altera batch_size
akaBotelho Jan 7, 2025
0933de2
corrige get_raw_serpro
akaBotelho Jan 8, 2025
76d2fab
teste transform_raw_to_nested_structure_chunked
akaBotelho Jan 8, 2025
2365f0f
atualização dos modelos e schema
akaBotelho Jan 9, 2025
3c8bcd6
docstrings
akaBotelho Jan 10, 2025
8cde4f9
ajustes para prod
akaBotelho Jan 10, 2025
a6f94bb
add changelogs
akaBotelho Jan 10, 2025
9d87e3b
altera unique_key
akaBotelho Jan 10, 2025
52c2959
Merge branch 'main' into staging/flow-serpro
mergify[bot] Jan 21, 2025
87180a6
bucket dev para teste
akaBotelho Jan 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Dockerfile-fedora
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ WORKDIR /app
COPY . .
RUN python3.10 -m pip install --prefer-binary --no-cache-dir -U .
RUN python3 -m pip install jaydebeapi
WORKDIR /app/queries
RUN dbt deps
WORKDIR /app
6 changes: 6 additions & 0 deletions pipelines/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog - pipelines

## [1.0.2] - 2025-01-10

### Adicionado

- Adiciona task `get_timestamp_range` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/255)

## [1.0.1] - 2024-12-13

### Adicionado
Expand Down
7 changes: 7 additions & 0 deletions pipelines/serpro/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Changelog - serpro

## [1.0.0] - 2025-01-10

### Adicionado

- Cria flow de captura de autuações fornecidos pelo sistema SERPRO (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/255)
34 changes: 34 additions & 0 deletions pipelines/serpro/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
"""
Constant values for rj_smtr serpro
"""

from enum import Enum


class constants(Enum): # pylint: disable=c0103
"""
Constant values for rj_smtr serpro
"""

INFRACAO_DATASET_ID = "infracao"
AUTUACAO_SERPRO_TABLE_ID = "autuacao_serpro"
AUTUACAO_MATERIALIZACAO_DATASET_ID = "transito"
AUTUACAO_MATERIALIZACAO_TABLE_ID = "autuacao"

INFRACAO_PRIVATE_BUCKET = "rj-smtr-infracao-private"

SERPRO_CAPTURE_PARAMS = {
"query": """
SELECT
*
FROM
dbpro_radar_view_SMTR_VBL.tb_infracao_view
WHERE
PARSEDATE(SUBSTRING(auinf_dt_infracao, 1, 10), 'yyyy-MM-dd')
BETWEEN PARSEDATE('{start_date}', 'yyyy-MM-dd')
AND PARSEDATE('{end_date}', 'yyyy-MM-dd')
""",
"primary_key": ["auinf_num_auto"],
"pre_treatment_reader_args": {"dtype": "object"},
}
102 changes: 95 additions & 7 deletions pipelines/serpro/flows.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,107 @@
# -*- coding: utf-8 -*-
from prefect import Parameter
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.utilities.edges import unmapped
from prefeitura_rio.pipelines_utils.custom import Flow
from prefeitura_rio.pipelines_utils.state_handlers import (
handler_initialize_sentry,
handler_inject_bd_credentials,
)

from pipelines.constants import constants as smtr_constants
from pipelines.serpro.tasks import wait_sleeping
from pipelines.migration.tasks import (
create_date_hour_partition,
create_local_partition_path,
get_current_timestamp,
get_now_time,
get_previous_date,
parse_timestamp_to_string,
rename_current_flow_run_now_time,
run_dbt_model,
transform_raw_to_nested_structure_chunked,
upload_raw_data_to_gcs,
upload_staging_data_to_gcs,
)
from pipelines.serpro.constants import constants
from pipelines.serpro.tasks import get_db_object, get_raw_serpro
from pipelines.serpro.utils import handler_setup_serpro

with Flow("SMTR - Teste Conexão Serpro") as flow:
# setup_serpro()
wait_sleeping()
with Flow("SMTR: SERPRO - Captura/Tratamento") as serpro_captura:
start_date = Parameter("start_date", default=get_previous_date.run(1))
end_date = Parameter("end_date", default=get_previous_date.run(1))

rename_flow_run = rename_current_flow_run_now_time(
prefix=serpro_captura.name + " ",
now_time=get_now_time(),
)

timestamp = get_current_timestamp()

partitions = create_date_hour_partition(
timestamp,
partition_date_only=unmapped(True),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remover unmapped

)

filenames = parse_timestamp_to_string(timestamp)

local_filepaths = create_local_partition_path(
dataset_id=constants.INFRACAO_DATASET_ID.value,
table_id=constants.AUTUACAO_SERPRO_TABLE_ID.value,
partitions=partitions,
filename=filenames,
)

jdbc = get_db_object()

raw_filepaths = get_raw_serpro(
jdbc=jdbc, start_date=start_date, end_date=end_date, local_filepath=local_filepaths
)

errors, treated_filepaths = transform_raw_to_nested_structure_chunked(
raw_filepath=raw_filepaths,
filepath=local_filepaths,
primary_key=constants.SERPRO_CAPTURE_PARAMS.value["primary_key"],
timestamp=timestamp,
reader_args=constants.SERPRO_CAPTURE_PARAMS.value["pre_treatment_reader_args"],
error=None,
chunksize=50000,
)

errors = upload_raw_data_to_gcs(
dataset_id=constants.INFRACAO_DATASET_ID.value,
table_id=constants.AUTUACAO_SERPRO_TABLE_ID.value,
raw_filepath=raw_filepaths,
partitions=partitions,
error=None,
bucket_name=constants.INFRACAO_PRIVATE_BUCKET.value,
)

wait_captura_true = upload_staging_data_to_gcs(
dataset_id=constants.INFRACAO_DATASET_ID.value,
table_id=constants.AUTUACAO_SERPRO_TABLE_ID.value,
staging_filepath=treated_filepaths,
partitions=partitions,
timestamp=timestamp,
error=errors,
bucket_name=constants.INFRACAO_PRIVATE_BUCKET.value,
)

wait_run_dbt_model = run_dbt_model(
dataset_id=constants.AUTUACAO_MATERIALIZACAO_DATASET_ID.value,
table_id=constants.AUTUACAO_MATERIALIZACAO_TABLE_ID.value,
_vars=[{"date_range_start": start_date, "date_range_end": end_date}],
upstream=True,
upstream_tasks=[wait_captura_true],
)

flow.storage = GCS(smtr_constants.GCS_FLOWS_BUCKET.value)
flow.run_config = KubernetesRun(
serpro_captura.storage = GCS(smtr_constants.GCS_FLOWS_BUCKET.value)
serpro_captura.run_config = KubernetesRun(
image=smtr_constants.DOCKER_IMAGE_FEDORA.value,
labels=[smtr_constants.RJ_SMTR_AGENT_LABEL.value],
)
flow.state_handlers = [handler_setup_serpro]
serpro_captura.state_handlers = [
handler_setup_serpro,
handler_inject_bd_credentials,
handler_initialize_sentry,
]
60 changes: 58 additions & 2 deletions pipelines/serpro/tasks.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,72 @@
# -*- coding: utf-8 -*-
import csv
from pathlib import Path
from time import sleep

from prefect import task

from pipelines.serpro.constants import constants
from pipelines.utils.jdbc import JDBC
from pipelines.utils.utils import log


@task
def wait_sleeping(interval_seconds: int = 54000, wait=None):
def wait_sleeping(interval_seconds: int = 54000):
sleep(interval_seconds)


@task
@task(checkpoint=False)
def get_db_object(secret_path="radar_serpro", environment: str = "dev"):
"""
Creates a JDBC object.

Args:
secret_path (str): The path to the secret containing database credentials.
Defaults to "radar_serpro".
environment (str): The environment for the connection. Defaults to "dev".

Returns:
JDBC: A JDBC connection object.
"""
return JDBC(db_params_secret_path=secret_path, environment=environment)


@task(checkpoint=False, nout=2)
def get_raw_serpro(
jdbc: JDBC, start_date: str, end_date: str, local_filepath: str, batch_size: int = 100000
) -> str:
"""
Task to fetch raw data from SERPRO based on a date range.

Args:
jdbc (JDBC): Instance for executing queries via JDBC.
start_date (str): Start date in the format "YYYY-MM-DD".
end_date (str): End date in the format "YYYY-MM-DD".
local_filepath (str): Path where the file will be saved.
batch_size (int): Batch size.

Returns:
str: Path of the saved file.
"""

raw_filepath = local_filepath.format(mode="raw", filetype="csv")
Path(raw_filepath).parent.mkdir(parents=True, exist_ok=True)

query = constants.SERPRO_CAPTURE_PARAMS.value["query"].format(
start_date=start_date, end_date=end_date
)

jdbc.execute_query(query)
columns = jdbc.get_columns()

with open(raw_filepath, "w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(columns)
while True:
rows = jdbc.fetch_batch(batch_size=batch_size)
if not rows:
break
writer.writerows(rows)

log(f"Raw data saved to: {raw_filepath}")
return raw_filepath
17 changes: 13 additions & 4 deletions pipelines/serpro/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import os
# import os
import subprocess

from prefect.engine.state import State

Expand All @@ -9,11 +10,19 @@

def setup_serpro(secret_path: str = "radar_serpro"):
data = get_secret(secret_path=secret_path)["setup.sh"]
log("Got Secret")
os.popen("touch setup.sh")

subprocess.run(["touch", "setup.sh"])
with open("setup.sh", "w") as f:
f.write(data)
return os.popen("sh setup.sh")

result = subprocess.run(["sh", "setup.sh"])

if result.returncode == 0:
log("setup.sh executou corretamente")
else:
raise Exception(f"Error executing setup.sh: {result.stderr}")

return result


def handler_setup_serpro(obj, old_state: State, new_state: State) -> State:
Expand Down
28 changes: 26 additions & 2 deletions pipelines/tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
"""Module containing general purpose tasks"""
from datetime import datetime
from typing import Any, Union
from datetime import datetime, timedelta
from typing import Any, List, Union

import prefect
from prefect import task
Expand Down Expand Up @@ -220,6 +220,30 @@ def run_subflow(
raise FailedSubFlow(failed_message)


@task
def get_timestamp_range(start_date: str = None, end_date: str = None) -> List[str]:
"""
Generates a list of all days between two given dates (inclusive).

Args:
start_date (str): The start date as a string in the format 'YYYY-MM-DD'.
end_date (str): The end date as a string in the format 'YYYY-MM-DD'.
"""

start_date_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_date_dt = datetime.strptime(end_date, "%Y-%m-%d")

timestamps = []
if start_date is None or end_date is None:
return None

while start_date_dt <= end_date_dt:
timestamps.append(start_date_dt)
start_date_dt += timedelta(days=1)

return timestamps


@task(trigger=all_finished)
def check_fail(results: Union[list, str]):
"""
Expand Down
3 changes: 3 additions & 0 deletions queries/models/sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ sources:

tables:
- name: autuacoes_citran
- name: autuacoes_serpro
- name: receita_autuacao
- name: infracoes_renainf

- name: dados_mestres
database: datario
Expand All @@ -190,6 +192,7 @@ sources:

tables:
- name: viagem_informada

- name: source_smtr
database: rj-smtr-dev

Expand Down
11 changes: 11 additions & 0 deletions queries/models/transito/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Changelog - infracao

## [1.0.3] - 2025-01-10

## Adicionado

- Adicionada a view `autuacao_serpro` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/255)
- Adicionada a tabela `aux_autuacao_id` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/255)

## Alterado

- Alterado a tabela `autuacao` incluindo join da view `autuacao_serpro` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/255)

## [1.0.2] - 2024-09-06

## Adicionado
Expand Down
Loading
Loading