From 753b76da315c41816ee318520bcf4c640433a767 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 14 Oct 2023 13:26:17 +0000 Subject: [PATCH] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../dump_api_prontuario_vitacare/main.py | 51 ++++++++++--------- .../rj_sms/dump_api_prontuario_vitai/flows.py | 8 +-- .../dump_azureblob_estoque_tpc/flows.py | 14 ++--- .../dump_azureblob_estoque_tpc/schedules.py | 2 +- pipelines/rj_sms/dump_ftp_cnes/constants.py | 1 + pipelines/rj_sms/dump_ftp_cnes/flows.py | 24 ++++----- pipelines/rj_sms/dump_ftp_cnes/schedules.py | 2 +- pipelines/rj_sms/dump_ftp_cnes/tasks.py | 32 ++++-------- pipelines/rj_sms/utils.py | 23 ++++----- 9 files changed, 70 insertions(+), 87 deletions(-) diff --git a/pipelines/rj_sms/dump_api_prontuario_vitacare/main.py b/pipelines/rj_sms/dump_api_prontuario_vitacare/main.py index 6f1d27566..e06c442af 100644 --- a/pipelines/rj_sms/dump_api_prontuario_vitacare/main.py +++ b/pipelines/rj_sms/dump_api_prontuario_vitacare/main.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- import csv import json import requests @@ -6,17 +7,17 @@ from google.cloud import storage from datetime import datetime, timedelta + @functions_framework.http def vitacare(request): - request_json = request.get_json(silent=True) request_args = request.args # Obtendo parametros da request - url = request_json.get('url') or request_args.get('url') - bucket_name = request_json.get('bucket') or request_args.get('bucket') - path = request_json.get('path') or request_args.get('path') - + url = request_json.get("url") or request_args.get("url") + bucket_name = request_json.get("bucket") or request_args.get("bucket") + path = request_json.get("path") or request_args.get("path") + if not url or not bucket_name or not path: return "Certifique-se de fornecer os parâmetros 'url', 'bucket' e 'path' na solicitação." @@ -28,24 +29,19 @@ def vitacare(request): def process_request(url, path, bucket_name): - if bucket_name == 'rj_whatsapp': - if path == 'clinica_scheduled_patients/origin/': + if bucket_name == "rj_whatsapp": + if path == "clinica_scheduled_patients/origin/": data = datetime.today() + timedelta(days=3) - elif path == 'clinica_patients_treated/origin/': + elif path == "clinica_patients_treated/origin/": data = datetime.today() - timedelta(days=1) else: - return 'Path unknown' - data_formatada = data.strftime('%Y-%m-%d') + return "Path unknown" + data_formatada = data.strftime("%Y-%m-%d") list_cnes = ["2269376"] - headers = { - 'Content-Type': 'application/json' - } + headers = {"Content-Type": "application/json"} for cnes in list_cnes: - payload = json.dumps({ - "cnes": cnes, - "date": data_formatada - }) - response = requests.request("GET", url, headers=headers, data=payload) + payload = json.dumps({"cnes": cnes, "date": data_formatada}) + response = requests.request("GET", url, headers=headers, data=payload) if response.status_code != 200: return f"A solicitação não foi bem-sucedida. Código de status: {response.status_code}" @@ -53,7 +49,7 @@ def process_request(url, path, bucket_name): json_data = response.json() df = pd.DataFrame(json_data) if df.empty: - return('DataFrame is empty!') + return "DataFrame is empty!" else: return df except ValueError: @@ -73,20 +69,25 @@ def process_request(url, path, bucket_name): def save_cloud_storage(df, bucket_name, path): - storage_client = storage.Client() bucket = storage_client.get_bucket(bucket_name) - if bucket_name == 'rj_whatsapp': - if path == 'clinica_scheduled_patients/origin/': + if bucket_name == "rj_whatsapp": + if path == "clinica_scheduled_patients/origin/": data = datetime.today() + timedelta(days=3) - elif path == 'clinica_patients_treated/origin/': + elif path == "clinica_patients_treated/origin/": data = datetime.today() - timedelta(days=1) - nome_arquivo = path + data + '.csv' + nome_arquivo = path + data + ".csv" else: nome_arquivo = path + datetime.now().strftime("%Y_%m_%d_%H_%M_%S") + ".csv" blob = bucket.blob(nome_arquivo) - csv_data = df.to_csv(sep=';', quoting=csv.QUOTE_NONNUMERIC, quotechar='"', index=False, encoding='utf-8') + csv_data = df.to_csv( + sep=";", + quoting=csv.QUOTE_NONNUMERIC, + quotechar='"', + index=False, + encoding="utf-8", + ) blob.upload_from_string(csv_data, content_type="text/csv") return f"Arquivo CSV salvo em gs://{bucket_name}/{nome_arquivo}" diff --git a/pipelines/rj_sms/dump_api_prontuario_vitai/flows.py b/pipelines/rj_sms/dump_api_prontuario_vitai/flows.py index 6aaaa43e0..3a75565d6 100644 --- a/pipelines/rj_sms/dump_api_prontuario_vitai/flows.py +++ b/pipelines/rj_sms/dump_api_prontuario_vitai/flows.py @@ -9,7 +9,9 @@ from prefect.storage import GCS from pipelines.utils.decorators import Flow from pipelines.constants import constants -from pipelines.rj_sms.dump_api_prontuario_vitai.constants import constants as vitai_constants +from pipelines.rj_sms.dump_api_prontuario_vitai.constants import ( + constants as vitai_constants, +) from pipelines.rj_sms.utils import ( create_folders, from_json_to_csv, @@ -60,7 +62,7 @@ create_partitions_task = create_partitions( data_path=create_folders_task["raw"], - partition_directory=create_folders_task["partition_directory"] + partition_directory=create_folders_task["partition_directory"], ) create_partitions_task.set_upstream(add_load_date_column_task) @@ -131,7 +133,7 @@ create_partitions_task = create_partitions( data_path=create_folders_task["raw"], - partition_directory=create_folders_task["partition_directory"] + partition_directory=create_folders_task["partition_directory"], ) create_partitions_task.set_upstream(add_load_date_column_task) diff --git a/pipelines/rj_sms/dump_azureblob_estoque_tpc/flows.py b/pipelines/rj_sms/dump_azureblob_estoque_tpc/flows.py index 02c9f42b1..b82013354 100644 --- a/pipelines/rj_sms/dump_azureblob_estoque_tpc/flows.py +++ b/pipelines/rj_sms/dump_azureblob_estoque_tpc/flows.py @@ -8,7 +8,9 @@ from prefect.storage import GCS from pipelines.utils.decorators import Flow from pipelines.constants import constants -from pipelines.rj_sms.dump_azureblob_estoque_tpc.constants import constants as tpc_constants +from pipelines.rj_sms.dump_azureblob_estoque_tpc.constants import ( + constants as tpc_constants, +) from pipelines.rj_sms.utils import ( download_azure_blob, create_folders, @@ -29,7 +31,7 @@ vault_path = tpc_constants.VAULT_PATH.value vault_key = tpc_constants.VAULT_KEY.value # Paramenters for GCP - dataset_id = tpc_constants.DATASET_ID.value + dataset_id = tpc_constants.DATASET_ID.value table_id = tpc_constants.TABLE_POSICAO_ID.value # Paramerters for Azure container_name = tpc_constants.CONTAINER_NAME.value @@ -52,14 +54,12 @@ conform_task = conform_csv_to_gcp(download_task) conform_task.set_upstream(download_task) - add_load_date_column_task = add_load_date_column( - input_path=download_task, sep=";" - ) + add_load_date_column_task = add_load_date_column(input_path=download_task, sep=";") add_load_date_column_task.set_upstream(conform_task) create_partitions_task = create_partitions( data_path=create_folders_task["raw"], - partition_directory=create_folders_task["partition_directory"] + partition_directory=create_folders_task["partition_directory"], ) create_partitions_task.set_upstream(add_load_date_column_task) @@ -82,4 +82,4 @@ ], ) -dump_tpc.schedule = every_day_at_six_am \ No newline at end of file +dump_tpc.schedule = every_day_at_six_am diff --git a/pipelines/rj_sms/dump_azureblob_estoque_tpc/schedules.py b/pipelines/rj_sms/dump_azureblob_estoque_tpc/schedules.py index c55c5f7e7..31f335956 100644 --- a/pipelines/rj_sms/dump_azureblob_estoque_tpc/schedules.py +++ b/pipelines/rj_sms/dump_azureblob_estoque_tpc/schedules.py @@ -19,4 +19,4 @@ ], ) ] -) \ No newline at end of file +) diff --git a/pipelines/rj_sms/dump_ftp_cnes/constants.py b/pipelines/rj_sms/dump_ftp_cnes/constants.py index 9f08b4899..c0bf87579 100644 --- a/pipelines/rj_sms/dump_ftp_cnes/constants.py +++ b/pipelines/rj_sms/dump_ftp_cnes/constants.py @@ -10,6 +10,7 @@ class constants(Enum): """ Constant values for the dump vitai flows """ + FTP_SERVER = "ftp.datasus.gov.br" FTP_FILE_PATH = "/cnes" BASE_FILE = "BASE_DE_DADOS_CNES" diff --git a/pipelines/rj_sms/dump_ftp_cnes/flows.py b/pipelines/rj_sms/dump_ftp_cnes/flows.py index a39048be9..953aed3ad 100644 --- a/pipelines/rj_sms/dump_ftp_cnes/flows.py +++ b/pipelines/rj_sms/dump_ftp_cnes/flows.py @@ -9,11 +9,7 @@ from pipelines.utils.decorators import Flow from pipelines.constants import constants from pipelines.rj_sms.dump_ftp_cnes.constants import constants as cnes_constants -from pipelines.rj_sms.utils import ( - download_ftp, - create_folders, - unzip_file -) +from pipelines.rj_sms.utils import download_ftp, create_folders, unzip_file from pipelines.rj_sms.dump_ftp_cnes.tasks import ( check_newest_file_version, conform_csv_to_gcp, @@ -40,7 +36,8 @@ user="", password="", directory=ftp_file_path, - file_name=base_file) + file_name=base_file, + ) create_folders_task = create_folders() create_folders_task.set_upstream(check_newest_file_version_task) @@ -51,13 +48,12 @@ password="", directory=ftp_file_path, file_name=check_newest_file_version_task["file"], - output_path=create_folders_task["raw"] + output_path=create_folders_task["raw"], ) download_task.set_upstream(create_folders_task) unzip_task = unzip_file( - file_path=download_task, - output_path=create_folders_task["raw"] + file_path=download_task, output_path=create_folders_task["raw"] ) unzip_task.set_upstream(download_task) @@ -66,14 +62,14 @@ add_multiple_date_column_task = add_multiple_date_column( directory=create_folders_task["raw"], - snapshot_date=check_newest_file_version_task['snapshot'], - sep=";") + snapshot_date=check_newest_file_version_task["snapshot"], + sep=";", + ) add_multiple_date_column_task.set_upstream(conform_task) upload_to_datalake_task = upload_multiple_tables_to_datalake( - path_files=conform_task, - dataset_id=dataset_id, - dump_mode="overwrite") + path_files=conform_task, dataset_id=dataset_id, dump_mode="overwrite" + ) upload_to_datalake_task.set_upstream(add_multiple_date_column_task) dump_cnes.storage = GCS(constants.GCS_FLOWS_BUCKET.value) diff --git a/pipelines/rj_sms/dump_ftp_cnes/schedules.py b/pipelines/rj_sms/dump_ftp_cnes/schedules.py index 5c6d4b794..a41ea84bc 100644 --- a/pipelines/rj_sms/dump_ftp_cnes/schedules.py +++ b/pipelines/rj_sms/dump_ftp_cnes/schedules.py @@ -19,4 +19,4 @@ ], ) ] -) \ No newline at end of file +) diff --git a/pipelines/rj_sms/dump_ftp_cnes/tasks.py b/pipelines/rj_sms/dump_ftp_cnes/tasks.py index 43899516b..92f20c162 100644 --- a/pipelines/rj_sms/dump_ftp_cnes/tasks.py +++ b/pipelines/rj_sms/dump_ftp_cnes/tasks.py @@ -13,18 +13,12 @@ from prefect import task from pipelines.utils.utils import log from pipelines.rj_sms.dump_ftp_cnes.constants import constants -from pipelines.rj_sms.utils import ( - list_files_ftp, - upload_to_datalake) +from pipelines.rj_sms.utils import list_files_ftp, upload_to_datalake @task def check_newest_file_version( - host: str, - user: str, - password: str, - directory: str, - file_name: str + host: str, user: str, password: str, directory: str, file_name: str ): """ Check the newest version of a file in a given FTP directory. @@ -69,7 +63,7 @@ def conform_csv_to_gcp(directory: str): List[str]: A list of filepaths of the conformed CSV files. """ # list all csv files in the directory - csv_files = [f for f in os.listdir(directory) if f.endswith('.csv')] + csv_files = [f for f in os.listdir(directory) if f.endswith(".csv")] log(f"Conforming {len(csv_files)} files...") @@ -81,9 +75,9 @@ def conform_csv_to_gcp(directory: str): filepath = os.path.join(directory, csv_file) # create a temporary file - with tempfile.NamedTemporaryFile(mode='w', delete=False) as tf: + with tempfile.NamedTemporaryFile(mode="w", delete=False) as tf: # open the original file in read mode - with open(filepath, 'r', encoding='iso8859-1') as f: + with open(filepath, "r", encoding="iso8859-1") as f: # read the first line first_line = f.readline() @@ -108,9 +102,7 @@ def conform_csv_to_gcp(directory: str): @task def upload_multiple_tables_to_datalake( - path_files: str, - dataset_id: str, - dump_mode: str + path_files: str, dataset_id: str, dump_mode: str ): """ Uploads multiple tables to datalake. @@ -124,7 +116,6 @@ def upload_multiple_tables_to_datalake( None """ for n, file in enumerate(path_files): - log(f"Uploading {n+1}/{len(path_files)} files to datalake...") # retrieve file name from path @@ -142,16 +133,12 @@ def upload_multiple_tables_to_datalake( csv_delimiter=";", if_storage_data_exists="replace", biglake_table=True, - dump_mode=dump_mode + dump_mode=dump_mode, ) @task -def add_multiple_date_column( - directory: str, - sep=";", - snapshot_date=None -): +def add_multiple_date_column(directory: str, sep=";", snapshot_date=None): """ Adds date metadata columns to all CSV files in a given directory. @@ -164,11 +151,10 @@ def add_multiple_date_column( now = datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S") # list all csv files in the directory - csv_files = [f for f in os.listdir(directory) if f.endswith('.csv')] + csv_files = [f for f in os.listdir(directory) if f.endswith(".csv")] # iterate over each csv file for n, csv_file in enumerate(csv_files): - log(f"Adding date metadata to {n+1}/{len(csv_files)} files ...") # construct the full file path diff --git a/pipelines/rj_sms/utils.py b/pipelines/rj_sms/utils.py index 34cb46db2..6d3061d73 100644 --- a/pipelines/rj_sms/utils.py +++ b/pipelines/rj_sms/utils.py @@ -177,9 +177,7 @@ def download_azure_blob( # Save the API data to a local file if add_load_date_to_filename: if load_date is None: - destination_file_path = ( - f"{file_folder}/{file_name}_{str(date.today())}.csv" - ) + destination_file_path = f"{file_folder}/{file_name}_{str(date.today())}.csv" else: destination_file_path = f"{file_folder}/{file_name}_{load_date}.csv" else: @@ -201,7 +199,7 @@ def download_ftp( password: str, directory: str, file_name: str, - output_path: str + output_path: str, ): """ Downloads a file from an FTP server and saves it to the specified output path. @@ -226,7 +224,7 @@ def download_ftp( ftp.login(user, password) # Get the size of the file - ftp.voidcmd('TYPE I') + ftp.voidcmd("TYPE I") total_size = ftp.size(file_path) # Create a callback function to be called when each block is read @@ -235,15 +233,15 @@ def callback(block): downloaded_size += len(block) percent_complete = (downloaded_size / total_size) * 100 if percent_complete // 5 > (downloaded_size - len(block)) // (total_size / 20): - log(f'Download is {percent_complete:.0f}% complete') + log(f"Download is {percent_complete:.0f}% complete") f.write(block) # Initialize the downloaded size downloaded_size = 0 # Download the file - with open(output_path, 'wb') as f: - ftp.retrbinary(f'RETR {file_path}', callback) + with open(output_path, "wb") as f: + ftp.retrbinary(f"RETR {file_path}", callback) # Close the connection ftp.quit() @@ -265,7 +263,7 @@ def download_url(url: str, file_name: str, file_folder: str) -> str: str: The full path to the downloaded file. """ file_path = os.path.join(file_folder, file_name) - with open(file_path, 'wb') as f: + with open(file_path, "wb") as f: c = pycurl.Curl() c.setopt(c.URL, url) c.setopt(c.WRITEDATA, f) @@ -312,7 +310,7 @@ def unzip_file(file_path: str, output_path: str): Returns: str: The path to the unzipped file. """ - with zipfile.ZipFile(file_path, 'r') as zip_ref: + with zipfile.ZipFile(file_path, "r") as zip_ref: zip_ref.extractall(output_path) log(f"File unzipped to {output_path}") @@ -464,7 +462,7 @@ def upload_to_datalake( csv_delimiter: str = ";", if_storage_data_exists: str = "replace", biglake_table: bool = True, - dump_mode: str = "append" + dump_mode: str = "append", ): """ Uploads data from a file to a BigQuery table in a specified dataset. @@ -524,8 +522,7 @@ def upload_to_datalake( ) # pylint: disable=C0301 tb.delete(mode="all") log( - "MODE OVERWRITE: Sucessfully DELETED TABLE:\n" - f"{table_staging}\n" + "MODE OVERWRITE: Sucessfully DELETED TABLE:\n" f"{table_staging}\n" ) # pylint: disable=C0301 tb.create(