-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Flow de captura SERPRO #255
Open
akaBotelho
wants to merge
82
commits into
main
Choose a base branch
from
staging/flow-serpro
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 80 commits
Commits
Show all changes
82 commits
Select commit
Hold shift + click to select a range
786b1f4
cria modelo autuacao_serpro
akaBotelho 0aeb6da
adiciona CTE serpro
akaBotelho d2a1ee7
cria task get_timestamp_range
akaBotelho 594e314
cria flow serpro_captura
akaBotelho 923613f
corrige tipo do timestamp
akaBotelho c5d34f5
altera query
akaBotelho 32d33b5
altera where
akaBotelho 3d03c01
add mkdir
akaBotelho 2e52ca3
add bucket_name
akaBotelho 4137132
Merge branch 'main' into staging/flow-serpro
mergify[bot] 989e6d9
corrige where
akaBotelho 127854c
Merge branch 'staging/flow-serpro' of https://github.com/prefeitura-r…
akaBotelho c486e63
add dbt deps
akaBotelho 807f7bb
aumenta max_retries e retry_delay da get_db_object
akaBotelho 88acb22
aumenta max_retries e retry_delay da get_db_object
akaBotelho 76e6c20
aumenta retry_delay da get_db_object
akaBotelho 3ced5d2
add path to setup_serpro
akaBotelho d244193
add workdir
akaBotelho ce0e9b9
Merge branch 'staging/flow-serpro' of https://github.com/prefeitura-r…
akaBotelho 84685dd
volta setup_serpro
akaBotelho 4317bd5
retorna max_retries e retry_delay da get_db_object
akaBotelho 59acf79
import subprocess
akaBotelho 0cec8e0
add pre_treatment_reader_args
akaBotelho fce39c3
comenta bucket_name
akaBotelho b4ce077
add exception
akaBotelho 16c3bed
altera get_db_object
akaBotelho 4b48791
teste dev
akaBotelho 7a97bc7
hmg
akaBotelho e69b9a5
altera modelos
akaBotelho f1560f9
upstream
akaBotelho 47c016f
corrige CTE serpro.cep_proprietario
akaBotelho 9307a50
add SAFE_CAST
akaBotelho c78f7d0
Merge branch 'main' into staging/flow-serpro
mergify[bot] 0189928
Merge branch 'main' into staging/flow-serpro
mergify[bot] 1aa7059
padroniza dados
akaBotelho 54de0b9
Merge branch 'main' into staging/flow-serpro
mergify[bot] 7b7d76b
Merge branch 'main' into staging/flow-serpro
mergify[bot] 7cc1ffb
Merge branch 'main' into staging/flow-serpro
mergify[bot] d4eeafa
Merge branch 'main' into staging/flow-serpro
mergify[bot] d71e4b6
Merge branch 'main' into staging/flow-serpro
mergify[bot] cd71ff9
Merge branch 'main' into staging/flow-serpro
mergify[bot] d354197
adiciona colunas serpro
akaBotelho 8d988ab
Merge branch 'main' into staging/flow-serpro
mergify[bot] 60cb5f5
Merge branch 'main' into staging/flow-serpro
mergify[bot] ecdc3e7
Merge branch 'main' into staging/flow-serpro
mergify[bot] 4bdf1ab
Merge branch 'main' into staging/flow-serpro
mergify[bot] 8ad0c7f
Merge branch 'main' into staging/flow-serpro
mergify[bot] 197a490
Merge branch 'main' into staging/flow-serpro
mergify[bot] e946468
Merge branch 'main' into staging/flow-serpro
mergify[bot] 0e6215f
Merge branch 'staging/flow-serpro' of https://github.com/prefeitura-r…
akaBotelho 52e8052
add batch_size
akaBotelho 0404450
Merge branch 'main' into staging/flow-serpro
akaBotelho 213ae37
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 30ab64b
target dev
akaBotelho 73df6bf
Merge branch 'main' into staging/flow-serpro
mergify[bot] 0215648
Merge branch 'main' into staging/flow-serpro
mergify[bot] 5686c0d
altera target para hmg
akaBotelho b96ad20
add source infracoes_renainf
akaBotelho 672d170
adiciona novas colunas
akaBotelho b11da43
add join infracoes_renainf
akaBotelho 047345d
Merge branch 'main' into staging/flow-serpro
akaBotelho 7cc3368
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] d703c63
Merge branch 'main' into staging/flow-serpro
mergify[bot] bf47bc9
altera where para teste
akaBotelho 10f12e8
Merge branch 'staging/flow-serpro' of https://github.com/prefeitura-r…
akaBotelho 29d5b5c
Merge branch 'main' into staging/flow-serpro
mergify[bot] 5851055
altera where para teste
akaBotelho 088e372
Merge branch 'staging/flow-serpro' of https://github.com/prefeitura-r…
akaBotelho 2414013
atualiza modelos
akaBotelho 78213a9
altera captura
akaBotelho 7b6b533
altera where
akaBotelho 3956045
corrige flow
akaBotelho ccbfd19
altera batch_size
akaBotelho 0933de2
corrige get_raw_serpro
akaBotelho 76d2fab
teste transform_raw_to_nested_structure_chunked
akaBotelho 2365f0f
atualização dos modelos e schema
akaBotelho 3c8bcd6
docstrings
akaBotelho 8cde4f9
ajustes para prod
akaBotelho a6f94bb
add changelogs
akaBotelho 9d87e3b
altera unique_key
akaBotelho 52c2959
Merge branch 'main' into staging/flow-serpro
mergify[bot] 87180a6
bucket dev para teste
akaBotelho File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
# Changelog - serpro | ||
|
||
## [1.0.0] - 2025-01-10 | ||
|
||
### Adicionado | ||
|
||
- Cria flow de captura de autuações fornecidos pelo sistema SERPRO (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/255) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
# -*- coding: utf-8 -*- | ||
""" | ||
Constant values for rj_smtr serpro | ||
""" | ||
|
||
from enum import Enum | ||
|
||
|
||
class constants(Enum): # pylint: disable=c0103 | ||
""" | ||
Constant values for rj_smtr serpro | ||
""" | ||
|
||
INFRACAO_DATASET_ID = "infracao" | ||
AUTUACAO_SERPRO_TABLE_ID = "autuacao_serpro" | ||
AUTUACAO_MATERIALIZACAO_DATASET_ID = "transito" | ||
AUTUACAO_MATERIALIZACAO_TABLE_ID = "autuacao" | ||
|
||
INFRACAO_PRIVATE_BUCKET = "rj-smtr-infracao-private" | ||
|
||
SERPRO_CAPTURE_PARAMS = { | ||
"query": """ | ||
SELECT | ||
* | ||
FROM | ||
dbpro_radar_view_SMTR_VBL.tb_infracao_view | ||
WHERE | ||
PARSEDATE(SUBSTRING(auinf_dt_infracao, 1, 10), 'yyyy-MM-dd') | ||
BETWEEN PARSEDATE('{start_date}', 'yyyy-MM-dd') | ||
AND PARSEDATE('{end_date}', 'yyyy-MM-dd') | ||
""", | ||
"primary_key": ["auinf_num_auto"], | ||
"pre_treatment_reader_args": {"dtype": "object"}, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,19 +1,107 @@ | ||
# -*- coding: utf-8 -*- | ||
from prefect import Parameter | ||
from prefect.run_configs import KubernetesRun | ||
from prefect.storage import GCS | ||
from prefect.utilities.edges import unmapped | ||
from prefeitura_rio.pipelines_utils.custom import Flow | ||
from prefeitura_rio.pipelines_utils.state_handlers import ( | ||
handler_initialize_sentry, | ||
handler_inject_bd_credentials, | ||
) | ||
|
||
from pipelines.constants import constants as smtr_constants | ||
from pipelines.serpro.tasks import wait_sleeping | ||
from pipelines.migration.tasks import ( | ||
create_date_hour_partition, | ||
create_local_partition_path, | ||
get_current_timestamp, | ||
get_now_time, | ||
get_previous_date, | ||
parse_timestamp_to_string, | ||
rename_current_flow_run_now_time, | ||
run_dbt_model, | ||
transform_raw_to_nested_structure_chunked, | ||
upload_raw_data_to_gcs, | ||
upload_staging_data_to_gcs, | ||
) | ||
from pipelines.serpro.constants import constants | ||
from pipelines.serpro.tasks import get_db_object, get_raw_serpro | ||
from pipelines.serpro.utils import handler_setup_serpro | ||
|
||
with Flow("SMTR - Teste Conexão Serpro") as flow: | ||
# setup_serpro() | ||
wait_sleeping() | ||
with Flow("SMTR: SERPRO - Captura/Tratamento") as serpro_captura: | ||
start_date = Parameter("start_date", default=get_previous_date.run(1)) | ||
end_date = Parameter("end_date", default=get_previous_date.run(1)) | ||
|
||
rename_flow_run = rename_current_flow_run_now_time( | ||
prefix=serpro_captura.name + " ", | ||
now_time=get_now_time(), | ||
) | ||
|
||
timestamp = get_current_timestamp() | ||
|
||
partitions = create_date_hour_partition( | ||
timestamp, | ||
partition_date_only=unmapped(True), | ||
) | ||
|
||
filenames = parse_timestamp_to_string(timestamp) | ||
|
||
local_filepaths = create_local_partition_path( | ||
dataset_id=constants.INFRACAO_DATASET_ID.value, | ||
table_id=constants.AUTUACAO_SERPRO_TABLE_ID.value, | ||
partitions=partitions, | ||
filename=filenames, | ||
) | ||
|
||
jdbc = get_db_object() | ||
|
||
raw_filepaths = get_raw_serpro( | ||
jdbc=jdbc, start_date=start_date, end_date=end_date, local_filepath=local_filepaths | ||
) | ||
|
||
errors, treated_filepaths = transform_raw_to_nested_structure_chunked( | ||
raw_filepath=raw_filepaths, | ||
filepath=local_filepaths, | ||
primary_key=constants.SERPRO_CAPTURE_PARAMS.value["primary_key"], | ||
timestamp=timestamp, | ||
reader_args=constants.SERPRO_CAPTURE_PARAMS.value["pre_treatment_reader_args"], | ||
error=None, | ||
chunksize=50000, | ||
) | ||
|
||
errors = upload_raw_data_to_gcs( | ||
dataset_id=constants.INFRACAO_DATASET_ID.value, | ||
table_id=constants.AUTUACAO_SERPRO_TABLE_ID.value, | ||
raw_filepath=raw_filepaths, | ||
partitions=partitions, | ||
error=None, | ||
bucket_name=constants.INFRACAO_PRIVATE_BUCKET.value, | ||
) | ||
|
||
wait_captura_true = upload_staging_data_to_gcs( | ||
dataset_id=constants.INFRACAO_DATASET_ID.value, | ||
table_id=constants.AUTUACAO_SERPRO_TABLE_ID.value, | ||
staging_filepath=treated_filepaths, | ||
partitions=partitions, | ||
timestamp=timestamp, | ||
error=errors, | ||
bucket_name=constants.INFRACAO_PRIVATE_BUCKET.value, | ||
) | ||
|
||
wait_run_dbt_model = run_dbt_model( | ||
dataset_id=constants.AUTUACAO_MATERIALIZACAO_DATASET_ID.value, | ||
table_id=constants.AUTUACAO_MATERIALIZACAO_TABLE_ID.value, | ||
_vars=[{"date_range_start": start_date, "date_range_end": end_date}], | ||
upstream=True, | ||
upstream_tasks=[wait_captura_true], | ||
) | ||
|
||
flow.storage = GCS(smtr_constants.GCS_FLOWS_BUCKET.value) | ||
flow.run_config = KubernetesRun( | ||
serpro_captura.storage = GCS(smtr_constants.GCS_FLOWS_BUCKET.value) | ||
serpro_captura.run_config = KubernetesRun( | ||
image=smtr_constants.DOCKER_IMAGE_FEDORA.value, | ||
labels=[smtr_constants.RJ_SMTR_AGENT_LABEL.value], | ||
) | ||
flow.state_handlers = [handler_setup_serpro] | ||
serpro_captura.state_handlers = [ | ||
handler_setup_serpro, | ||
handler_inject_bd_credentials, | ||
handler_initialize_sentry, | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,72 @@ | ||
# -*- coding: utf-8 -*- | ||
import csv | ||
from pathlib import Path | ||
from time import sleep | ||
|
||
from prefect import task | ||
|
||
from pipelines.serpro.constants import constants | ||
from pipelines.utils.jdbc import JDBC | ||
from pipelines.utils.utils import log | ||
|
||
|
||
@task | ||
def wait_sleeping(interval_seconds: int = 54000, wait=None): | ||
def wait_sleeping(interval_seconds: int = 54000): | ||
sleep(interval_seconds) | ||
|
||
|
||
@task | ||
@task(checkpoint=False) | ||
def get_db_object(secret_path="radar_serpro", environment: str = "dev"): | ||
""" | ||
Creates a JDBC object. | ||
|
||
Args: | ||
secret_path (str): The path to the secret containing database credentials. | ||
Defaults to "radar_serpro". | ||
environment (str): The environment for the connection. Defaults to "dev". | ||
|
||
Returns: | ||
JDBC: A JDBC connection object. | ||
""" | ||
return JDBC(db_params_secret_path=secret_path, environment=environment) | ||
|
||
|
||
@task(checkpoint=False, nout=2) | ||
def get_raw_serpro( | ||
jdbc: JDBC, start_date: str, end_date: str, local_filepath: str, batch_size: int = 100000 | ||
) -> str: | ||
""" | ||
Task to fetch raw data from SERPRO based on a date range. | ||
|
||
Args: | ||
jdbc (JDBC): Instance for executing queries via JDBC. | ||
start_date (str): Start date in the format "YYYY-MM-DD". | ||
end_date (str): End date in the format "YYYY-MM-DD". | ||
local_filepath (str): Path where the file will be saved. | ||
batch_size (int): Batch size. | ||
|
||
Returns: | ||
str: Path of the saved file. | ||
""" | ||
|
||
raw_filepath = local_filepath.format(mode="raw", filetype="csv") | ||
Path(raw_filepath).parent.mkdir(parents=True, exist_ok=True) | ||
|
||
query = constants.SERPRO_CAPTURE_PARAMS.value["query"].format( | ||
start_date=start_date, end_date=end_date | ||
) | ||
|
||
jdbc.execute_query(query) | ||
columns = jdbc.get_columns() | ||
|
||
with open(raw_filepath, "w", newline="") as csvfile: | ||
writer = csv.writer(csvfile) | ||
writer.writerow(columns) | ||
while True: | ||
rows = jdbc.fetch_batch(batch_size=batch_size) | ||
if not rows: | ||
break | ||
writer.writerows(rows) | ||
|
||
log(f"Raw data saved to: {raw_filepath}") | ||
return raw_filepath |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remover unmapped