diff --git a/pipelines/rj_smtr/veiculo/tasks.py b/pipelines/rj_smtr/veiculo/tasks.py index 3a5c595ea..273fd6089 100644 --- a/pipelines/rj_smtr/veiculo/tasks.py +++ b/pipelines/rj_smtr/veiculo/tasks.py @@ -2,6 +2,7 @@ """ Tasks for veiculos """ +import os from pathlib import Path import traceback import zipfile @@ -248,18 +249,41 @@ def pre_treatment_sppo_infracao(status: dict, timestamp: datetime): @task -def get_raw_ftp(ftp_path: str, filetype: str, csv_args: dict, timestamp: datetime): +def get_raw_ftp( + ftp_path: str, + filetype: str, + csv_args: dict, + timestamp: datetime, +): + """ + Retrieves raw data from an FTP server. + + Args: + ftp_path (str): The path to the file on the FTP server. + filetype (str): The file extension of the raw data file. + csv_args (dict): Additional arguments to be passed to the `pd.read_csv` function. + timestamp (datetime): The timestamp used to construct the file name. + + Returns: + dict: A dictionary containing the retrieved data and any error messages. + The 'data' key holds the retrieved data as a list of dictionaries. + The 'error' key holds any error message encountered during the retrieval process. + """ data = None error = None try: - ftp_path = f"{ftp_path}_{timestamp.strftime('%Y%m%d')}.{filetype}" - ftp_client = connect_ftp(constants.RDO_FTPS_SECRET_PATH.value) - buffer = io.BytesIO() - ftp_client.retrbinary("RETR " + ftp_path, buffer.write) - buffer.seek(0) - if filetype in ("csv", "txt"): - data = pd.read_csv(buffer, **csv_args) + ftp_client = connect_ftp(constants.RDO_FTPS_SECRET_PATH.value) + data = pd.read_csv( + io.StringIO( + ftp_client.retrbinary( + f"RETR {ftp_path}_{timestamp.strftime('%Y%m%d')}.{filetype}", + io.BytesIO().write, + ) + ), + **csv_args, + ).to_dict(orient="records") + ftp_client.quit() else: error = "Unsupported raw file extension. Supported only: csv and txt"