From 589d19124d24c6e4dc44bf24dd71d1c70572a979 Mon Sep 17 00:00:00 2001 From: Guilherme Botelho Date: Thu, 26 Sep 2024 19:49:52 -0300 Subject: [PATCH] dump serpro --- pipelines/serpro/flows.py | 24 ++++++++++++++++++++++-- pipelines/serpro/tasks.py | 38 ++++++++++++++++++++++++++++++++++++++ pipelines/utils/jdbc.py | 7 +++++++ 3 files changed, 67 insertions(+), 2 deletions(-) diff --git a/pipelines/serpro/flows.py b/pipelines/serpro/flows.py index 2e0585b2..2ee47e47 100644 --- a/pipelines/serpro/flows.py +++ b/pipelines/serpro/flows.py @@ -1,19 +1,39 @@ # -*- coding: utf-8 -*- +from prefect import Parameter from prefect.run_configs import KubernetesRun from prefect.storage import GCS +from prefect.utilities.edges import unmapped from prefeitura_rio.pipelines_utils.custom import Flow from pipelines.constants import constants as smtr_constants -from pipelines.serpro.tasks import wait_sleeping +from pipelines.migration.tasks import upload_raw_data_to_gcs +from pipelines.serpro.tasks import dump_serpro, get_db_object from pipelines.serpro.utils import handler_setup_serpro with Flow("SMTR - Teste Conexão Serpro") as flow: + batch_size = Parameter("batch_size", default=10000) # setup_serpro() - wait_sleeping() + # wait_sleeping() + + jdbc = get_db_object() + csv_files = dump_serpro(jdbc, batch_size) + + upload_raw_data_to_gcs.map( + dataset_id=unmapped("radar_serpro"), + table_id=unmapped("tb_infracao_view"), + raw_filepath=csv_files, + partitions=unmapped(None), + error=unmapped(None), + bucket_name=unmapped("rj-smtr-dev"), + ) flow.storage = GCS(smtr_constants.GCS_FLOWS_BUCKET.value) flow.run_config = KubernetesRun( image=smtr_constants.DOCKER_IMAGE_FEDORA.value, labels=[smtr_constants.RJ_SMTR_AGENT_LABEL.value], + cpu_limit="1000m", + memory_limit="4600Mi", + cpu_request="500m", + memory_request="1000Mi", ) flow.state_handlers = [handler_setup_serpro] diff --git a/pipelines/serpro/tasks.py b/pipelines/serpro/tasks.py index aec6afe3..c736a581 100644 --- a/pipelines/serpro/tasks.py +++ b/pipelines/serpro/tasks.py @@ -1,5 +1,8 @@ # -*- coding: utf-8 -*- +import csv +import os from time import sleep +from typing import List from prefect import task @@ -14,3 +17,38 @@ def wait_sleeping(interval_seconds: int = 54000, wait=None): @task def get_db_object(secret_path="radar_serpro", environment: str = "dev"): return JDBC(db_params_secret_path=secret_path, environment=environment) + + +@task +def dump_serpro(jdbc: JDBC, batch_size: int) -> List[str]: + + index = 0 + data_folder = os.getenv("DATA_FOLDER", "data") + file_path = f"{os.getcwd()}/{data_folder}/raw/radar_serpro/tb_infracao_view" + csv_files = [] + + query = "SELECT * FROM dbpro_radar_view_SMTR_VBL.tb_infracao_view" + + jdbc.execute_query(query) + + columns = jdbc.get_columns() + + while True: + rows = jdbc.fetch_batch(batch_size) + + if not rows: + break + + output_file = file_path + f"dados_infracao_{index}.csv" + + with open(output_file, "w", newline="") as csvfile: + writer = csv.writer(csvfile) + writer.writerow(columns) + writer.writerows(rows) + + csv_files.append(output_file) + index += 1 + + jdbc.close_connection() + + return csv_files diff --git a/pipelines/utils/jdbc.py b/pipelines/utils/jdbc.py index 9a4e9b39..f3a85211 100644 --- a/pipelines/utils/jdbc.py +++ b/pipelines/utils/jdbc.py @@ -68,3 +68,10 @@ def fetch_all(self) -> List[List]: Fetches all rows from the JDBC database. """ return [list(item) for item in self._cursor.fetchall()] + + def close_connection(self): + """ + Closes the JDBC connection. + """ + if self._connection: + self._connection.close()