Skip to content

Commit

Permalink
[pre-commit.ci] auto fixes from pre-commit.com hooks
Browse files Browse the repository at this point in the history
for more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] committed Oct 14, 2023
1 parent 3ccc624 commit 753b76d
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 87 deletions.
51 changes: 26 additions & 25 deletions pipelines/rj_sms/dump_api_prontuario_vitacare/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
import csv
import json
import requests
Expand All @@ -6,17 +7,17 @@
from google.cloud import storage
from datetime import datetime, timedelta


@functions_framework.http
def vitacare(request):

request_json = request.get_json(silent=True)
request_args = request.args

# Obtendo parametros da request
url = request_json.get('url') or request_args.get('url')
bucket_name = request_json.get('bucket') or request_args.get('bucket')
path = request_json.get('path') or request_args.get('path')
url = request_json.get("url") or request_args.get("url")
bucket_name = request_json.get("bucket") or request_args.get("bucket")
path = request_json.get("path") or request_args.get("path")

if not url or not bucket_name or not path:
return "Certifique-se de fornecer os parâmetros 'url', 'bucket' e 'path' na solicitação."

Expand All @@ -28,32 +29,27 @@ def vitacare(request):


def process_request(url, path, bucket_name):
if bucket_name == 'rj_whatsapp':
if path == 'clinica_scheduled_patients/origin/':
if bucket_name == "rj_whatsapp":
if path == "clinica_scheduled_patients/origin/":
data = datetime.today() + timedelta(days=3)
elif path == 'clinica_patients_treated/origin/':
elif path == "clinica_patients_treated/origin/":
data = datetime.today() - timedelta(days=1)
else:
return 'Path unknown'
data_formatada = data.strftime('%Y-%m-%d')
return "Path unknown"
data_formatada = data.strftime("%Y-%m-%d")
list_cnes = ["2269376"]
headers = {
'Content-Type': 'application/json'
}
headers = {"Content-Type": "application/json"}
for cnes in list_cnes:
payload = json.dumps({
"cnes": cnes,
"date": data_formatada
})
response = requests.request("GET", url, headers=headers, data=payload)
payload = json.dumps({"cnes": cnes, "date": data_formatada})
response = requests.request("GET", url, headers=headers, data=payload)
if response.status_code != 200:
return f"A solicitação não foi bem-sucedida. Código de status: {response.status_code}"

try:
json_data = response.json()
df = pd.DataFrame(json_data)
if df.empty:
return('DataFrame is empty!')
return "DataFrame is empty!"
else:
return df
except ValueError:
Expand All @@ -73,20 +69,25 @@ def process_request(url, path, bucket_name):


def save_cloud_storage(df, bucket_name, path):

storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)

if bucket_name == 'rj_whatsapp':
if path == 'clinica_scheduled_patients/origin/':
if bucket_name == "rj_whatsapp":
if path == "clinica_scheduled_patients/origin/":
data = datetime.today() + timedelta(days=3)
elif path == 'clinica_patients_treated/origin/':
elif path == "clinica_patients_treated/origin/":
data = datetime.today() - timedelta(days=1)
nome_arquivo = path + data + '.csv'
nome_arquivo = path + data + ".csv"
else:
nome_arquivo = path + datetime.now().strftime("%Y_%m_%d_%H_%M_%S") + ".csv"
blob = bucket.blob(nome_arquivo)
csv_data = df.to_csv(sep=';', quoting=csv.QUOTE_NONNUMERIC, quotechar='"', index=False, encoding='utf-8')
csv_data = df.to_csv(
sep=";",
quoting=csv.QUOTE_NONNUMERIC,
quotechar='"',
index=False,
encoding="utf-8",
)
blob.upload_from_string(csv_data, content_type="text/csv")

return f"Arquivo CSV salvo em gs://{bucket_name}/{nome_arquivo}"
8 changes: 5 additions & 3 deletions pipelines/rj_sms/dump_api_prontuario_vitai/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
from prefect.storage import GCS
from pipelines.utils.decorators import Flow
from pipelines.constants import constants
from pipelines.rj_sms.dump_api_prontuario_vitai.constants import constants as vitai_constants
from pipelines.rj_sms.dump_api_prontuario_vitai.constants import (
constants as vitai_constants,
)
from pipelines.rj_sms.utils import (
create_folders,
from_json_to_csv,
Expand Down Expand Up @@ -60,7 +62,7 @@

create_partitions_task = create_partitions(
data_path=create_folders_task["raw"],
partition_directory=create_folders_task["partition_directory"]
partition_directory=create_folders_task["partition_directory"],
)
create_partitions_task.set_upstream(add_load_date_column_task)

Expand Down Expand Up @@ -131,7 +133,7 @@

create_partitions_task = create_partitions(
data_path=create_folders_task["raw"],
partition_directory=create_folders_task["partition_directory"]
partition_directory=create_folders_task["partition_directory"],
)
create_partitions_task.set_upstream(add_load_date_column_task)

Expand Down
14 changes: 7 additions & 7 deletions pipelines/rj_sms/dump_azureblob_estoque_tpc/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
from prefect.storage import GCS
from pipelines.utils.decorators import Flow
from pipelines.constants import constants
from pipelines.rj_sms.dump_azureblob_estoque_tpc.constants import constants as tpc_constants
from pipelines.rj_sms.dump_azureblob_estoque_tpc.constants import (
constants as tpc_constants,
)
from pipelines.rj_sms.utils import (
download_azure_blob,
create_folders,
Expand All @@ -29,7 +31,7 @@
vault_path = tpc_constants.VAULT_PATH.value
vault_key = tpc_constants.VAULT_KEY.value
# Paramenters for GCP
dataset_id = tpc_constants.DATASET_ID.value
dataset_id = tpc_constants.DATASET_ID.value
table_id = tpc_constants.TABLE_POSICAO_ID.value
# Paramerters for Azure
container_name = tpc_constants.CONTAINER_NAME.value
Expand All @@ -52,14 +54,12 @@
conform_task = conform_csv_to_gcp(download_task)
conform_task.set_upstream(download_task)

add_load_date_column_task = add_load_date_column(
input_path=download_task, sep=";"
)
add_load_date_column_task = add_load_date_column(input_path=download_task, sep=";")
add_load_date_column_task.set_upstream(conform_task)

create_partitions_task = create_partitions(
data_path=create_folders_task["raw"],
partition_directory=create_folders_task["partition_directory"]
partition_directory=create_folders_task["partition_directory"],
)
create_partitions_task.set_upstream(add_load_date_column_task)

Expand All @@ -82,4 +82,4 @@
],
)

dump_tpc.schedule = every_day_at_six_am
dump_tpc.schedule = every_day_at_six_am
2 changes: 1 addition & 1 deletion pipelines/rj_sms/dump_azureblob_estoque_tpc/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@
],
)
]
)
)
1 change: 1 addition & 0 deletions pipelines/rj_sms/dump_ftp_cnes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class constants(Enum):
"""
Constant values for the dump vitai flows
"""

FTP_SERVER = "ftp.datasus.gov.br"
FTP_FILE_PATH = "/cnes"
BASE_FILE = "BASE_DE_DADOS_CNES"
Expand Down
24 changes: 10 additions & 14 deletions pipelines/rj_sms/dump_ftp_cnes/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@
from pipelines.utils.decorators import Flow
from pipelines.constants import constants
from pipelines.rj_sms.dump_ftp_cnes.constants import constants as cnes_constants
from pipelines.rj_sms.utils import (
download_ftp,
create_folders,
unzip_file
)
from pipelines.rj_sms.utils import download_ftp, create_folders, unzip_file
from pipelines.rj_sms.dump_ftp_cnes.tasks import (
check_newest_file_version,
conform_csv_to_gcp,
Expand All @@ -40,7 +36,8 @@
user="",
password="",
directory=ftp_file_path,
file_name=base_file)
file_name=base_file,
)

create_folders_task = create_folders()
create_folders_task.set_upstream(check_newest_file_version_task)
Expand All @@ -51,13 +48,12 @@
password="",
directory=ftp_file_path,
file_name=check_newest_file_version_task["file"],
output_path=create_folders_task["raw"]
output_path=create_folders_task["raw"],
)
download_task.set_upstream(create_folders_task)

unzip_task = unzip_file(
file_path=download_task,
output_path=create_folders_task["raw"]
file_path=download_task, output_path=create_folders_task["raw"]
)
unzip_task.set_upstream(download_task)

Expand All @@ -66,14 +62,14 @@

add_multiple_date_column_task = add_multiple_date_column(
directory=create_folders_task["raw"],
snapshot_date=check_newest_file_version_task['snapshot'],
sep=";")
snapshot_date=check_newest_file_version_task["snapshot"],
sep=";",
)
add_multiple_date_column_task.set_upstream(conform_task)

upload_to_datalake_task = upload_multiple_tables_to_datalake(
path_files=conform_task,
dataset_id=dataset_id,
dump_mode="overwrite")
path_files=conform_task, dataset_id=dataset_id, dump_mode="overwrite"
)
upload_to_datalake_task.set_upstream(add_multiple_date_column_task)

dump_cnes.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
Expand Down
2 changes: 1 addition & 1 deletion pipelines/rj_sms/dump_ftp_cnes/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@
],
)
]
)
)
32 changes: 9 additions & 23 deletions pipelines/rj_sms/dump_ftp_cnes/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,12 @@
from prefect import task
from pipelines.utils.utils import log
from pipelines.rj_sms.dump_ftp_cnes.constants import constants
from pipelines.rj_sms.utils import (
list_files_ftp,
upload_to_datalake)
from pipelines.rj_sms.utils import list_files_ftp, upload_to_datalake


@task
def check_newest_file_version(
host: str,
user: str,
password: str,
directory: str,
file_name: str
host: str, user: str, password: str, directory: str, file_name: str
):
"""
Check the newest version of a file in a given FTP directory.
Expand Down Expand Up @@ -69,7 +63,7 @@ def conform_csv_to_gcp(directory: str):
List[str]: A list of filepaths of the conformed CSV files.
"""
# list all csv files in the directory
csv_files = [f for f in os.listdir(directory) if f.endswith('.csv')]
csv_files = [f for f in os.listdir(directory) if f.endswith(".csv")]

log(f"Conforming {len(csv_files)} files...")

Expand All @@ -81,9 +75,9 @@ def conform_csv_to_gcp(directory: str):
filepath = os.path.join(directory, csv_file)

# create a temporary file
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tf:
with tempfile.NamedTemporaryFile(mode="w", delete=False) as tf:
# open the original file in read mode
with open(filepath, 'r', encoding='iso8859-1') as f:
with open(filepath, "r", encoding="iso8859-1") as f:
# read the first line
first_line = f.readline()

Expand All @@ -108,9 +102,7 @@ def conform_csv_to_gcp(directory: str):

@task
def upload_multiple_tables_to_datalake(
path_files: str,
dataset_id: str,
dump_mode: str
path_files: str, dataset_id: str, dump_mode: str
):
"""
Uploads multiple tables to datalake.
Expand All @@ -124,7 +116,6 @@ def upload_multiple_tables_to_datalake(
None
"""
for n, file in enumerate(path_files):

log(f"Uploading {n+1}/{len(path_files)} files to datalake...")

# retrieve file name from path
Expand All @@ -142,16 +133,12 @@ def upload_multiple_tables_to_datalake(
csv_delimiter=";",
if_storage_data_exists="replace",
biglake_table=True,
dump_mode=dump_mode
dump_mode=dump_mode,
)


@task
def add_multiple_date_column(
directory: str,
sep=";",
snapshot_date=None
):
def add_multiple_date_column(directory: str, sep=";", snapshot_date=None):
"""
Adds date metadata columns to all CSV files in a given directory.
Expand All @@ -164,11 +151,10 @@ def add_multiple_date_column(
now = datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S")

# list all csv files in the directory
csv_files = [f for f in os.listdir(directory) if f.endswith('.csv')]
csv_files = [f for f in os.listdir(directory) if f.endswith(".csv")]

# iterate over each csv file
for n, csv_file in enumerate(csv_files):

log(f"Adding date metadata to {n+1}/{len(csv_files)} files ...")
# construct the full file path

Expand Down
Loading

0 comments on commit 753b76d

Please sign in to comment.