Skip to content

Commit

Permalink
dump serpro
Browse files Browse the repository at this point in the history
  • Loading branch information
akaBotelho committed Sep 26, 2024
1 parent 14326ff commit 589d191
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 2 deletions.
24 changes: 22 additions & 2 deletions pipelines/serpro/flows.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,39 @@
# -*- coding: utf-8 -*-
from prefect import Parameter
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.utilities.edges import unmapped
from prefeitura_rio.pipelines_utils.custom import Flow

from pipelines.constants import constants as smtr_constants
from pipelines.serpro.tasks import wait_sleeping
from pipelines.migration.tasks import upload_raw_data_to_gcs
from pipelines.serpro.tasks import dump_serpro, get_db_object
from pipelines.serpro.utils import handler_setup_serpro

with Flow("SMTR - Teste Conexão Serpro") as flow:
batch_size = Parameter("batch_size", default=10000)
# setup_serpro()
wait_sleeping()
# wait_sleeping()

jdbc = get_db_object()
csv_files = dump_serpro(jdbc, batch_size)

upload_raw_data_to_gcs.map(
dataset_id=unmapped("radar_serpro"),
table_id=unmapped("tb_infracao_view"),
raw_filepath=csv_files,
partitions=unmapped(None),
error=unmapped(None),
bucket_name=unmapped("rj-smtr-dev"),
)

flow.storage = GCS(smtr_constants.GCS_FLOWS_BUCKET.value)
flow.run_config = KubernetesRun(
image=smtr_constants.DOCKER_IMAGE_FEDORA.value,
labels=[smtr_constants.RJ_SMTR_AGENT_LABEL.value],
cpu_limit="1000m",
memory_limit="4600Mi",
cpu_request="500m",
memory_request="1000Mi",
)
flow.state_handlers = [handler_setup_serpro]
38 changes: 38 additions & 0 deletions pipelines/serpro/tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# -*- coding: utf-8 -*-
import csv
import os
from time import sleep
from typing import List

from prefect import task

Expand All @@ -14,3 +17,38 @@ def wait_sleeping(interval_seconds: int = 54000, wait=None):
@task
def get_db_object(secret_path="radar_serpro", environment: str = "dev"):
return JDBC(db_params_secret_path=secret_path, environment=environment)


@task
def dump_serpro(jdbc: JDBC, batch_size: int) -> List[str]:

index = 0
data_folder = os.getenv("DATA_FOLDER", "data")
file_path = f"{os.getcwd()}/{data_folder}/raw/radar_serpro/tb_infracao_view"
csv_files = []

query = "SELECT * FROM dbpro_radar_view_SMTR_VBL.tb_infracao_view"

jdbc.execute_query(query)

columns = jdbc.get_columns()

while True:
rows = jdbc.fetch_batch(batch_size)

if not rows:
break

output_file = file_path + f"dados_infracao_{index}.csv"

with open(output_file, "w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(columns)
writer.writerows(rows)

csv_files.append(output_file)
index += 1

jdbc.close_connection()

return csv_files
7 changes: 7 additions & 0 deletions pipelines/utils/jdbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,10 @@ def fetch_all(self) -> List[List]:
Fetches all rows from the JDBC database.
"""
return [list(item) for item in self._cursor.fetchall()]

def close_connection(self):
"""
Closes the JDBC connection.
"""
if self._connection:
self._connection.close()

0 comments on commit 589d191

Please sign in to comment.