diff --git a/code_owners.yaml b/code_owners.yaml index f2f563c5f..775494551 100644 --- a/code_owners.yaml +++ b/code_owners.yaml @@ -20,6 +20,8 @@ pipelines: - fernandascovino - eng-rodrigocunha - borismarinho + - pixuimpou + - lingsv rj_escritorio: owners: - gabriel-milan diff --git a/pipelines/constants.py b/pipelines/constants.py index 309325d35..900e2ebf9 100644 --- a/pipelines/constants.py +++ b/pipelines/constants.py @@ -138,4 +138,12 @@ class constants(Enum): # pylint: disable=c0103 "user_id": "369657115012366336", "type": "user_nickname", }, + "rafaelpinheiro": { + "user_id": "1131538976101109772", + "type": "user_nickname", + }, + "carolinagomes": { + "user_id": "620000269392019469", + "type": "user_nickname", + }, } diff --git a/pipelines/rj_escritorio/__init__.py b/pipelines/rj_escritorio/__init__.py index 0813a42ba..a5c864245 100644 --- a/pipelines/rj_escritorio/__init__.py +++ b/pipelines/rj_escritorio/__init__.py @@ -18,6 +18,7 @@ from pipelines.rj_escritorio.waze.flows import * from pipelines.rj_escritorio.geolocator.flows import * from pipelines.rj_escritorio.inea.flows import * +from pipelines.rj_escritorio.dump_ftp_inea.flows import * from pipelines.rj_escritorio.seconserva_buracos_refresh_data.flows import * from pipelines.rj_escritorio.dump_url_turismo.flows import * from pipelines.rj_escritorio.dump_policy_matrix.flows import * diff --git a/pipelines/rj_escritorio/dump_ftp_inea/flows.py b/pipelines/rj_escritorio/dump_ftp_inea/flows.py new file mode 100644 index 000000000..b60de657b --- /dev/null +++ b/pipelines/rj_escritorio/dump_ftp_inea/flows.py @@ -0,0 +1,152 @@ +# -*- coding: utf-8 -*- +""" +Dumping data from INEA FTP to BigQuery +""" +# pylint: disable=E1101,C0103,bad-continuation + +from copy import deepcopy + +from prefect import Parameter +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS +from prefect.utilities.edges import unmapped + +from pipelines.constants import constants +from pipelines.rj_escritorio.dump_ftp_inea.tasks import ( + get_ftp_client, + get_files_datalake, + get_files_from_ftp, + download_files, + select_files_to_download, + upload_file_to_gcs, +) +from pipelines.rj_escritorio.dump_ftp_inea.schedules import ( + every_5_minutes, + every_5_minutes_mac, + every_1_day, + every_1_day_mac, +) +from pipelines.rj_cor.tasks import get_on_redis, save_on_redis +from pipelines.utils.decorators import Flow + + +with Flow( + "INEA: Captura FTP dados de radar (Guaratiba)", code_owners=["paty"] +) as inea_ftp_radar_flow: + bucket_name = Parameter("bucket_name", default="rj-escritorio-dev", required=False) + date = Parameter("date", default=None, required=False) + get_only_last_file = Parameter("get_only_last_file", default=True, required=False) + greater_than = Parameter("greater_than", default=None, required=False) + check_datalake_files = Parameter( + "check_datalake_files", default=True, required=False + ) + prefix = Parameter( + "prefix", default="raw/meio_ambiente_clima/inea_radar_hdf5", required=False + ) + mode = Parameter("mode", default="prod", required=False) + radar = Parameter("radar", default="gua", required=False) + product = Parameter("product", default="ppi", required=False) + + client = get_ftp_client() + + files = get_files_from_ftp( + client=client, + radar=radar, + ) + + redis_files = get_on_redis( + dataset_id="meio_ambiente_clima", + table_id=radar, + mode=mode, + wait=files, + ) + + datalake_files = get_files_datalake( + bucket_name=bucket_name, + prefix=prefix, + radar=radar, + product=product, + date=date, + greater_than=greater_than, + check_datalake_files=check_datalake_files, + mode=mode, + wait=files, + ) + + files_to_download = select_files_to_download( + files=files, + redis_files=redis_files, + datalake_files=datalake_files, + date=date, + greater_than=greater_than, + get_only_last_file=get_only_last_file, + ) + + files_to_upload = download_files( + client=client, files=files_to_download, radar=radar + ) + + upload_files = upload_file_to_gcs.map( + file_to_upload=files_to_upload, + bucket_name=unmapped(bucket_name), + prefix=unmapped(prefix), + mode=unmapped(mode), + radar=unmapped(radar), + product=unmapped(product), + ) + + save_on_redis( + dataset_id="meio_ambiente_clima", + table_id=radar, + mode=mode, + files=files_to_upload, + keep_last=14400, # last 30 days files + wait=upload_files, + ) + +inea_ftp_radar_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +inea_ftp_radar_flow.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[constants.RJ_ESCRITORIO_DEV_AGENT_LABEL.value], +) +inea_ftp_radar_flow.schedule = every_5_minutes + +inea_ftp_radar_flow_mac = deepcopy(inea_ftp_radar_flow) +inea_ftp_radar_flow_mac.name = "INEA: Captura FTP dados de radar (Macaé)" +inea_ftp_radar_flow_mac.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +inea_ftp_radar_flow_mac.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[constants.RJ_ESCRITORIO_DEV_AGENT_LABEL.value], +) +inea_ftp_radar_flow_mac.schedule = every_5_minutes_mac + +inea_ftp_radar_flow_fill_missing = deepcopy(inea_ftp_radar_flow) +inea_ftp_radar_flow_fill_missing.name = ( + "INEA: Captura FTP dados de radar (Guaratiba): preenchimento de arquivos faltantes" +) +inea_ftp_radar_flow_fill_missing.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +inea_ftp_radar_flow_fill_missing.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[constants.RJ_ESCRITORIO_DEV_AGENT_LABEL.value], +) +inea_ftp_radar_flow_fill_missing.schedule = every_1_day + +inea_ftp_radar_flow_fill_missing_mac = deepcopy(inea_ftp_radar_flow) +inea_ftp_radar_flow_fill_missing_mac.name = ( + "INEA: Captura FTP dados de radar (Macaé): preenchimento de arquivos faltantes" +) +inea_ftp_radar_flow_fill_missing_mac.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +inea_ftp_radar_flow_fill_missing_mac.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[constants.RJ_ESCRITORIO_DEV_AGENT_LABEL.value], +) +inea_ftp_radar_flow_fill_missing_mac.schedule = every_1_day_mac + +inea_ftp_backfill_radar_flow = deepcopy(inea_ftp_radar_flow) +inea_ftp_backfill_radar_flow.name = "INEA: Captura dados de radar (backfill)" +inea_ftp_backfill_radar_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +inea_ftp_backfill_radar_flow.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[constants.RJ_ESCRITORIO_DEV_AGENT_LABEL.value], +) +inea_ftp_backfill_radar_flow.schedule = None diff --git a/pipelines/rj_escritorio/dump_ftp_inea/schedules.py b/pipelines/rj_escritorio/dump_ftp_inea/schedules.py new file mode 100644 index 000000000..a8db99996 --- /dev/null +++ b/pipelines/rj_escritorio/dump_ftp_inea/schedules.py @@ -0,0 +1,92 @@ +# -*- coding: utf-8 -*- +# pylint: disable=C0103 +""" +Schedules for the INEA flows. +""" + +from datetime import timedelta, datetime + +from prefect.schedules import Schedule +from prefect.schedules.clocks import IntervalClock +import pytz + +from pipelines.constants import constants + +every_5_minutes = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(minutes=5), + start_date=datetime(2021, 1, 1, tzinfo=pytz.timezone("America/Sao_Paulo")), + labels=[constants.INEA_AGENT_LABEL.value], + parameter_defaults={ + "bucket_name": "rj-escritorio-dev", + "convert_params": "-k=ODIM2.1 -M=All", + "mode": "prod", + "output_format": "HDF5", + "prefix": "raw/meio_ambiente_clima/inea_radar_hdf5", + "product": "ppi", + "radar": "gua", + "vols_remote_directory": "/var/opt/edge/vols", + }, + ) + ] +) +every_5_minutes_mac = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(minutes=5), + start_date=datetime(2021, 1, 1, tzinfo=pytz.timezone("America/Sao_Paulo")), + labels=[constants.INEA_AGENT_LABEL.value], + parameter_defaults={ + "bucket_name": "rj-escritorio-dev", + "convert_params": "-k=ODIM2.1 -M=All", + "mode": "prod", + "output_format": "HDF5", + "prefix": "raw/meio_ambiente_clima/inea_radar_hdf5", + "product": "ppi", + "radar": "mac", + "vols_remote_directory": "/var/opt/edge/vols", + }, + ) + ] +) +every_1_day = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(days=1), + start_date=datetime(2021, 1, 1, tzinfo=pytz.timezone("America/Sao_Paulo")), + labels=[constants.INEA_AGENT_LABEL.value], + parameter_defaults={ + "bucket_name": "rj-escritorio-dev", + "convert_params": "-k=ODIM2.1 -M=All", + "mode": "prod", + "output_format": "HDF5", + "prefix": "raw/meio_ambiente_clima/inea_radar_hdf5", + "product": "ppi", + "radar": "gua", + "get_only_last_file": False, + "vols_remote_directory": "/var/opt/edge/vols", + }, + ) + ] +) +every_1_day_mac = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(days=1), + start_date=datetime(2021, 1, 1, tzinfo=pytz.timezone("America/Sao_Paulo")), + labels=[constants.INEA_AGENT_LABEL.value], + parameter_defaults={ + "bucket_name": "rj-escritorio-dev", + "convert_params": "-k=ODIM2.1 -M=All", + "mode": "prod", + "output_format": "HDF5", + "prefix": "raw/meio_ambiente_clima/inea_radar_hdf5", + "product": "ppi", + "radar": "mac", + "get_only_last_file": False, + "vols_remote_directory": "/var/opt/edge/vols", + }, + ) + ] +) diff --git a/pipelines/rj_escritorio/dump_ftp_inea/tasks.py b/pipelines/rj_escritorio/dump_ftp_inea/tasks.py new file mode 100644 index 000000000..cbdd6864a --- /dev/null +++ b/pipelines/rj_escritorio/dump_ftp_inea/tasks.py @@ -0,0 +1,294 @@ +# -*- coding: utf-8 -*- +""" +Tasks to dump data from a INEA FTP to BigQuery +""" +# pylint: disable=E0702,E1137,E1136,E1101,W0613 +from datetime import datetime, timedelta +from pathlib import Path +from typing import List + +from google.cloud import storage +from prefect import task +from prefect.engine.signals import ENDRUN +from prefect.engine.state import Skipped + +from pipelines.utils.ftp.client import FTPClient +from pipelines.utils.utils import ( + log, + get_credentials_from_env, + get_vault_secret, + list_blobs_with_prefix, +) + + +@task(nout=2, max_retries=2, retry_delay=timedelta(seconds=10)) +# pylint: disable=too-many-arguments,too-many-locals, too-many-branches +def get_files_datalake( + bucket_name: str, + prefix: str, + radar: str, + product: str, + date: str = None, + greater_than: str = None, + check_datalake_files: bool = True, + mode: str = "prod", + wait=None, # pylint: disable=unused-argument +) -> List[str]: + """ + List files from INEA saved on datalake + + Args: + product (str): "ppi" + date (str): Date of the files to be fetched (e.g. 2022-01-25) + greater_than (str): Fetch files with a date greater than this one (e.g. 2022-01-25) + radar (str): Radar name. Must be `gua` or `mac` + get_only_last_file (bool): Treat only the last file available + + How to use: + to get real time data: + let `greater_than` and `date` as None and `get_only_last_file` as True + This will prevent the flow to be stucked treating all files when something happend + and stoped the flow. Otherwise the flow will take a long time to treat all files + and came back to real time. + to fill missing files up to two days ago: + let `greater_than` and `date` as None and `get_only_last_file` as False + for backfill or to fill missing files for dates greather than two days ago: + add a `greater_than` date and let `date` as None and `get_only_last_file` as False + get all files for one day + let `greater_than` as None and `get_only_last_file` as False and fill `date` + """ + + if check_datalake_files: + search_prefix = f"{prefix}/radar={radar}/produto={product}" + + # Get today's blobs + if date: + current_date = datetime.strptime(date, "%Y-%m-%d") + else: + current_date = datetime.now().date() + + if greater_than is None: + past_date = current_date - timedelta(days=1) + else: + past_date = datetime.strptime(greater_than, "%Y-%m-%d") + past_date = past_date.date() + + blobs = [] + # Next, we get past day's blobs + while past_date <= current_date: + past_date_str = past_date.strftime("%Y-%m-%d") + past_blobs = list_blobs_with_prefix( + bucket_name=bucket_name, + prefix=f"{search_prefix}/data_particao={past_date_str}", + mode=mode, + ) + log( + f"Searched for blobs with prefix {search_prefix}/data_particao={past_date_str}" + ) + # Then, we merge the two lists + blobs += past_blobs + past_date += timedelta(days=1) + + # Now, we sort it by `blob.name` + blobs.sort(key=lambda blob: blob.name) + # Get only the filenames + datalake_files = [blob.name.split("/")[-1] for blob in blobs] + # Format of the name is 9921GUA-PPIVol-20220930-121010-0004.hdf + # We need remove the last characters to stay with 9921GUA-PPIVol-20220930-121010 + datalake_files = ["-".join(fname.split("-")[:-1]) for fname in datalake_files] + log(f"Last 10 datalake files: {datalake_files[-10:]}") + + else: + datalake_files = [] + log("This run is not considering datalake files") + + return datalake_files + + +@task +def get_ftp_client(wait=None): + """ + Get FTP client + """ + inea_secret = get_vault_secret("ftp_inea_radar") + hostname = inea_secret["data"]["hostname"] + username = inea_secret["data"]["username"] + password = inea_secret["data"]["password"] + + return FTPClient(hostname=hostname, username=username, password=password) + + +@task(max_retries=3, retry_delay=timedelta(seconds=30)) +# pylint: disable=too-many-arguments +def get_files_from_ftp( + client, + radar: str, +) -> List[str]: + """ + List and get files to download FTP + """ + + client.connect() + files = client.list_files(path=f"./{radar.upper()}/") + + # Skip task if there is no new file on FTP + if len(files) == 0: + log("No new available files on FTP") + skip = Skipped("No new available files on FTP") + raise ENDRUN(state=skip) + + log(f"Last 10 files on FTP: {files[-10:]} {len(files)}") + log(f"files on FTP: {files}") + + return files + + +@task(max_retries=3, retry_delay=timedelta(seconds=30)) +# pylint: disable=too-many-arguments +def select_files_to_download( + files: list, + redis_files: list, + datalake_files: list, + date: str = None, + greater_than: str = None, + get_only_last_file: bool = True, +) -> List[str]: + """ + Select files to download + + Args: + radar (str): Radar name. Must be `gua` or `mac` + redis_files (list): List with last files saved on GCP and redis + datalake_files (list): List with filenames saved on GCP + date (str): Date of the files to be fetched (e.g. 2022-01-25) + greater_than (str): Fetch files with a date greater than this one (e.g. 2022-01-25) + get_only_last_file (bool): Treat only the last file available + + How to use: + to get real time data: + let `greater_than` and `date` as None and `get_only_last_file` as True + This will prevent the flow to be stucked treating all files when something happend + and stoped the flow. Otherwise the flow will take a long time to treat all files + and came back to real time. + to fill missing files up to two days ago: + let `greater_than` and `date` as None and `get_only_last_file` as False + for backfill or to fill missing files for dates greather than two days ago: + add a `greater_than` date and let `date` as None and `get_only_last_file` as False + get all files for one day + let `greater_than` as None and `get_only_last_file` as False and fill `date` + """ + + # log(f"\n\nAvailable files on FTP: {files}") + # log(f"\nFiles already saved on redis_files: {redis_files}") + + # Files obtained direct from INEA ends with 0000 as "9915MAC-PPIVol-20230921-123000-0000.hdf" + # Files from FTP ends with an alphanumeric string as "9915MAC-PPIVol-20230921-142000-54d4.hdf" + # We need to be careful when changing one pipeline to other + + # Get specific files based on date and greater_than parameters + if date: + files = [file for file in files if file.split("-")[2] == date.replace("-", "")] + log(f"Last 10 files on FTP for date {date}: {files[-10:]}") + + if greater_than: + files = [ + file + for file in files + if file.split("-")[2] >= greater_than.replace("-", "") + ] + log( + f"Last 10 files on FTP for date greater than {greater_than}: {files[-10:]}" + ) + + # Check if files are already on redis + files = [file for file in files if file not in redis_files] + log(f"Last 10 files on FTP that are not on redis: {files[-10:]}") + + # Check if files are already on datalake + # Some datalake files use the pattern "9915MAC-PPIVol-20230921-123000-0000.hdf" + # Files from FTP use the pattern "./MAC/9915MAC-PPIVol-20230921-123000-3f28.hdf" + # We are going to compare "9915MAC-PPIVol-20230921-123000" from both places + if len(datalake_files) > 0: + log("Removing files that are already on datalake") + files = [ + file + for file in files + if "-".join(file.split("/")[-1].split("-")[:-1]) not in datalake_files + ] + + # Skip task if there is no new file + if len(files) == 0: + log("No new available files") + skip = Skipped("No new available files") + raise ENDRUN(state=skip) + + files.sort() + + if get_only_last_file: + files = [files[-1]] + log(f"\nFiles to be downloaded: {files}") + return files + + +@task(max_retries=3, retry_delay=timedelta(seconds=30)) +def download_files(client, files, radar) -> List[str]: + """ + Download files from FTP + """ + + save_path = Path(radar.upper()) + save_path.mkdir(parents=True, exist_ok=True) + + client.connect() + files_downloaded = [] + for file in files: + log(f"Downloading file: {file}") + # file_path = save_path / file + file_path = file + client.download(remote_path=file, local_path=file_path) + files_downloaded.append(file_path) + log(f"Downloaded: {files_downloaded}") + file = Path(files_downloaded[0]) + log(f"DEBUGGGG: {file.name.split('-')[2]}") + return files_downloaded + + +@task(max_retries=3, retry_delay=timedelta(seconds=30)) +# pylint: disable=too-many-arguments, too-many-locals +def upload_file_to_gcs( + file_to_upload: str, + bucket_name: str, + prefix: str, + radar: str, + product: str, + mode="prod", + task_mode="partitioned", + unlink: bool = True, +): + """ + Upload files to GCS + """ + credentials = get_credentials_from_env(mode=mode) + storage_client = storage.Client(credentials=credentials) + + bucket = storage_client.bucket(bucket_name) + + file = Path(file_to_upload) + if task_mode == "partitioned": + log(f"DEBUG: {file} e {file.name}") + date_str = file.name.split("-")[2] + date = datetime.strptime(date_str, "%Y%m%d").strftime("%Y-%m-%d") + blob_name = ( + f"{prefix}/radar={radar}/produto={product}/data_particao={date}/{file.name}" + ) + blob_name = blob_name.replace("//", "/") + elif task_mode == "raw": + blob_name = f"{prefix}/{file.name}" + + log(f"Uploading file {file} to GCS...") + log(f"Blob name will be {blob_name}") + blob = bucket.blob(blob_name) + blob.upload_from_filename(file) + log(f"File {file} uploaded to GCS.") + if unlink: + file.unlink() diff --git a/pipelines/rj_segovi/dump_db_1746/schedules.py b/pipelines/rj_segovi/dump_db_1746/schedules.py index 50e416106..f8f6819ad 100644 --- a/pipelines/rj_segovi/dump_db_1746/schedules.py +++ b/pipelines/rj_segovi/dump_db_1746/schedules.py @@ -374,7 +374,8 @@ case when cv.ic_vinculo = 'O' or cv.ic_vinculo = 'S' then cv.id_chamado_pai_fk end ) as 'reclamacoes', - no_justificativa + no_justificativa, + oc.id_origem_ocorrencia from tb_chamado as ch inner join ( @@ -550,7 +551,8 @@ chs.dt_alvo_finalizacao, chs.dt_alvo_diagnostico, cl.dt_real_diagnostico, - no_justificativa + no_justificativa, + oc.id_origem_ocorrencia """ _1746_queries = { diff --git a/pipelines/rj_smfp/__init__.py b/pipelines/rj_smfp/__init__.py index 022606109..a8b9019d0 100644 --- a/pipelines/rj_smfp/__init__.py +++ b/pipelines/rj_smfp/__init__.py @@ -6,6 +6,7 @@ from pipelines.rj_smfp.dump_db_ergon_comlurb.flows import * from pipelines.rj_smfp.dump_db_metas.flows import * from pipelines.rj_smfp.dump_db_sigma.flows import * +from pipelines.rj_smfp.dump_db_sigma_compras_materiais.flows import * from pipelines.rj_smfp.dump_inadimplente.flows import * from pipelines.rj_smfp.dump_url_metas.flows import * from pipelines.rj_smfp.goals_dashboard_dbt.flows import * diff --git a/pipelines/rj_smfp/dump_db_ergon/flows.py b/pipelines/rj_smfp/dump_db_ergon/flows.py index dbc04cb08..4e0324338 100644 --- a/pipelines/rj_smfp/dump_db_ergon/flows.py +++ b/pipelines/rj_smfp/dump_db_ergon/flows.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- """ -Database dumping flows for segovi project +Database dumping flows for segovi project. """ from copy import deepcopy diff --git a/pipelines/rj_smfp/dump_db_ergon/schedules.py b/pipelines/rj_smfp/dump_db_ergon/schedules.py index d709e913b..cb5b9b2b0 100644 --- a/pipelines/rj_smfp/dump_db_ergon/schedules.py +++ b/pipelines/rj_smfp/dump_db_ergon/schedules.py @@ -206,6 +206,7 @@ TOTAL_ANOS,DATA_PROXIMO,NOME_PROXIMO,EMP_CODIGO FROM ERGON.TOTAL_CONTA """, + "interval": timedelta(days=15), }, "pre_contagem": { "materialize_after_dump": True, diff --git a/pipelines/rj_smfp/dump_db_sigma_compras_materiais/__init__.py b/pipelines/rj_smfp/dump_db_sigma_compras_materiais/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pipelines/rj_smfp/dump_db_sigma_compras_materiais/flows.py b/pipelines/rj_smfp/dump_db_sigma_compras_materiais/flows.py new file mode 100644 index 000000000..8c8c1fdf9 --- /dev/null +++ b/pipelines/rj_smfp/dump_db_sigma_compras_materiais/flows.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +""" +Database dumping flows for SMFP SIGMA COMPRAS MATERIAIS +""" + +from copy import deepcopy + +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS + +from pipelines.constants import constants + +# importa o schedule +from pipelines.rj_smfp.dump_db_sigma_compras_materiais.schedules import ( + compras_sigma_daily_update_schedule, +) +from pipelines.utils.dump_db.flows import dump_sql_flow +from pipelines.utils.utils import set_default_parameters + +rj_smfp_dump_db_sigma_medicamentos_flow = deepcopy(dump_sql_flow) +rj_smfp_dump_db_sigma_medicamentos_flow.name = ( + "SMFP: COMPRAS MATERIAIS SERVICOS SIGMA - Ingerir tabelas de banco SQL" +) +rj_smfp_dump_db_sigma_medicamentos_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) + +rj_smfp_dump_db_sigma_medicamentos_flow.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[ + constants.RJ_SMFP_AGENT_LABEL.value, # label do agente + ], +) + +rj_smfp_dump_db_sigma_medicamentos_default_parameters = { + "db_database": "CP01.SMF", + "db_host": "10.90.31.22", + "db_port": "1521", + "db_type": "oracle", + "dataset_id": "compras_materiais_servicos_sigma", + "vault_secret_path": "db-sigma", +} + +rj_smfp_dump_db_sigma_medicamentos_flow = set_default_parameters( + rj_smfp_dump_db_sigma_medicamentos_flow, + default_parameters=rj_smfp_dump_db_sigma_medicamentos_default_parameters, +) + +rj_smfp_dump_db_sigma_medicamentos_flow.schedule = compras_sigma_daily_update_schedule diff --git a/pipelines/rj_smfp/dump_db_sigma_compras_materiais/schedules.py b/pipelines/rj_smfp/dump_db_sigma_compras_materiais/schedules.py new file mode 100644 index 000000000..dd6847c56 --- /dev/null +++ b/pipelines/rj_smfp/dump_db_sigma_compras_materiais/schedules.py @@ -0,0 +1,233 @@ +# -*- coding: utf-8 -*- +""" +Schedules for the SMS SIGMA dump_db pipeline. +""" + +from datetime import timedelta, datetime + +from prefect.schedules import Schedule +import pytz + +from pipelines.constants import constants +from pipelines.utils.dump_db.utils import generate_dump_db_schedules +from pipelines.utils.utils import untuple_clocks as untuple + + +##################################### +# +# SMS SIGMA Schedules +# +##################################### + +_sigma_queries = { + "classe": { + "biglake_table": True, + "materialize_after_dump": True, + "materialization_mode": "prod", + "dump_mode": "overwrite", + "execute_query": """ + SELECT + CD_GRUPO, + CD_CLASSE, + DS_CLASSE, + ST_STATUS + FROM SIGMA.VW_CLASSE + """, # noqa + }, + "fornecedor": { + "biglake_table": True, + "materialize_after_dump": True, + "materialization_mode": "prod", + "dump_mode": "overwrite", + "execute_query": """ + SELECT + CPF_CNPJ, + TIPO_CPF_CNPJ, + INSCRICAO_MUNICIPAL, + INSCRICAO_ESTADUAL, + RAZAO_SOCIAL, + NOME_FANTASIA, + NOME_CONTATO, + EMAIL, + EMAIL_CONTATO, + FAX, + DDD, + DDI, + RAMAL, + TELEFONE, + LOGRADOURO, + NUMERO_PORTA, + COMPLEMENTO, + BAIRRO, + MUNICIPIO, + UF, + CEP, + ATIVO_INATIVO_BLOQUEADO, + CD_NATUREZA_JURIDICA, + DS_NATUREZA_JURIDICA, + RAMO_ATIVIDADE, + CD_PORTE_EMPRESA, + DATA_ULTIMA_ATUALIZACAO, + FORNECEDOR_EVENTUAL + FROM SIGMA.VW_FORNECEDOR + """, # noqa + }, + "fornecedor_sem_vinculo": { + "biglake_table": True, + "materialize_after_dump": True, + "materialization_mode": "prod", + "dump_mode": "overwrite", + "execute_query": """ + SELECT + CPF_CNPJ, + TIPO_CPF_CNPJ, + NOME, + NUMERO_PORTA, + COMPLEMENTO + FROM SIGMA.VW_FORNECEDOR_SEM_VINCULO + """, # noqa + }, + "grupo": { + "biglake_table": True, + "materialize_after_dump": True, + "materialization_mode": "prod", + "dump_mode": "overwrite", + "execute_query": """ + SELECT + CD_GRUPO, + DS_GRUPO, + ST_STATUS + FROM SIGMA.VW_GRUPO + """, # noqa + }, + "material": { + "biglake_table": True, + "materialize_after_dump": True, + "materialization_mode": "prod", + "dump_mode": "overwrite", + "execute_query": """ + SELECT + CD_MATERIAL, + CD_GRUPO, + CD_CLASSE, + CD_SUBCLASSE, + SEQUENCIAL, + DV1, + DV2, + NM_PADRONIZADO, + NM_COMPLEMENTAR_MATERIAL, + UNIDADE, + DS_DETALHE_MATERIAL, + DT_DESATIVACAO, + ST_STATUS, + REMUME + FROM SIGMA.VW_MATERIAL + """, # noqa + }, + "movimentacao": { + "biglake_table": True, + "materialize_after_dump": True, + "materialization_mode": "prod", + "dump_mode": "overwrite", + "execute_query": """ + SELECT + CD_MATERIAL, + CNPJ_FORNECEDOR, + NOTA_FISCAL, + SERIE, + DATA_NOTA_FISCAL, + QUANTIDADE_ITEM, + PRECO_ITEM, + TOTAL_ITEM, + DATA_ULTIMA_ATUALIZACAO, + CD_MOVIMENTACAO, + DS_MOVIMENTACAO, + TP_ALMOXARIFADO, + CD_SECRETARIA, + DS_SECRETARIA, + CD_ALMOXARIFADO_DESTINO, + DS_ALMOXARIFADO_DESTINO, + CD_ALMOXARIFADO_ORIGEM, + DS_ALMOXARIFADO_ORIGEM, + CD_OS, + DT_INI_CONTRATO_OS, + DT_FIM_CONTRATO_OS, + NR_EMPENHO, + CNPJ_FABRICANTE + FROM SIGMA.VW_MOVIMENTACAO + """, # noqa + "interval": timedelta(days=7), + }, + "ramo_atividade": { + "biglake_table": True, + "materialize_after_dump": True, + "materialization_mode": "prod", + "dump_mode": "overwrite", + "execute_query": """ + SELECT + CD_RAMO, + DS_RAMO, + ST_RAMO + FROM SIGMA.VW_RAMO_ATIVIDADE + """, # noqa + }, + "servico": { + "biglake_table": True, + "materialize_after_dump": True, + "materialization_mode": "prod", + "dump_mode": "overwrite", + "execute_query": """ + SELECT + CD_SERV, + CD_SEQ, + CD_SERVICO, + DS_SERVICO, + ST_STATUS + FROM SIGMA.VW_SERVICO + """, # noqa + }, + "subclasse": { + "biglake_table": True, + "materialize_after_dump": True, + "materialization_mode": "prod", + "dump_mode": "overwrite", + "execute_query": """ + SELECT + CD_GRUPO, + CD_CLASSE, + CD_SUBCLASSE, + DS_SUBCLASSE, + ST_STATUS + FROM SIGMA.VW_SUBCLASSE + """, # noqa + }, + "unidade": { + "biglake_table": True, + "materialize_after_dump": True, + "materialization_mode": "prod", + "dump_mode": "overwrite", + "execute_query": """ + SELECT + UNIDADE, + DS_UNIDADE + FROM SIGMA.VW_UNIDADE + """, # noqa + }, +} + +sigma_infra_clocks = generate_dump_db_schedules( + interval=timedelta(days=1), + start_date=datetime(2022, 3, 21, 1, 0, tzinfo=pytz.timezone("America/Sao_Paulo")), + labels=[ + constants.RJ_SMFP_AGENT_LABEL.value, + ], + db_database="CP01.SMF", + db_host="10.90.31.22", + db_port="1521", + db_type="oracle", + dataset_id="compras_materiais_servicos_sigma", + vault_secret_path="db-sigma", + table_parameters=_sigma_queries, +) + +compras_sigma_daily_update_schedule = Schedule(clocks=untuple(sigma_infra_clocks)) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index d7f44e3b9..568f96154 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -7,26 +7,46 @@ from prefect.run_configs import KubernetesRun from prefect.storage import GCS +from prefect.tasks.prefect import create_flow_run, wait_for_flow_run +from prefect.utilities.edges import unmapped # EMD Imports # from pipelines.constants import constants as emd_constants +from pipelines.utils.decorators import Flow +from pipelines.utils.tasks import ( + rename_current_flow_run_now_time, + get_current_flow_labels, +) + + +from pipelines.utils.utils import set_default_parameters # SMTR Imports # -from pipelines.rj_smtr.flows import default_capture_flow +from pipelines.rj_smtr.flows import ( + default_capture_flow, + default_materialization_flow, +) + +from pipelines.rj_smtr.tasks import ( + get_current_timestamp, +) from pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.schedules import ( - bilhetagem_principal_schedule, bilhetagem_transacao_schedule, ) +from pipelines.rj_smtr.constants import constants + +from pipelines.rj_smtr.schedules import every_hour + # Flows # # BILHETAGEM TRANSAÇÃO - CAPTURA A CADA MINUTO # bilhetagem_transacao_captura = deepcopy(default_capture_flow) -bilhetagem_transacao_captura.name = "SMTR: Bilhetagem Transação (captura)" +bilhetagem_transacao_captura.name = "SMTR: Bilhetagem Transação - Captura" bilhetagem_transacao_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) bilhetagem_transacao_captura.run_config = KubernetesRun( image=emd_constants.DOCKER_IMAGE.value, @@ -34,13 +54,91 @@ ) bilhetagem_transacao_captura.schedule = bilhetagem_transacao_schedule -# BILHETAGEM PRINCIPAL - CAPTURA DIÁRIA DE DIVERSAS TABELAS # +# BILHETAGEM AUXILIAR - SUBFLOW PARA RODAR ANTES DE CADA MATERIALIZAÇÃO # + +bilhetagem_auxiliar_captura = deepcopy(default_capture_flow) +bilhetagem_auxiliar_captura.name = "SMTR: Bilhetagem Auxiliar - Captura (subflow)" +bilhetagem_auxiliar_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +bilhetagem_auxiliar_captura.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], +) + +bilhetagem_auxiliar_captura = set_default_parameters( + flow=bilhetagem_auxiliar_captura, + default_parameters={ + "dataset_id": constants.BILHETAGEM_DATASET_ID.value, + "secret_path": constants.BILHETAGEM_SECRET_PATH.value, + "source_type": constants.BILHETAGEM_GENERAL_CAPTURE_PARAMS.value["source_type"], + }, +) + +# MATERIALIZAÇÃO - SUBFLOW DE MATERIALIZAÇÃO +bilhetagem_materializacao = deepcopy(default_materialization_flow) +bilhetagem_materializacao.name = "SMTR: Bilhetagem Transação - Materialização (subflow)" +bilhetagem_materializacao.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +bilhetagem_materializacao.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], +) + +bilhetagem_materializacao_parameters = { + "dataset_id": constants.BILHETAGEM_DATASET_ID.value +} | constants.BILHETAGEM_MATERIALIZACAO_PARAMS.value + +bilhetagem_materializacao = set_default_parameters( + flow=bilhetagem_materializacao, + default_parameters=bilhetagem_materializacao_parameters, +) + +# TRATAMENTO - RODA DE HORA EM HORA, CAPTURA AUXILIAR + MATERIALIZAÇÃO +with Flow( + "SMTR: Bilhetagem Transação - Tratamento", + code_owners=["caio", "fernanda", "boris", "rodrigo"], +) as bilhetagem_transacao_tratamento: + timestamp = get_current_timestamp() + + rename_flow_run = rename_current_flow_run_now_time( + prefix=bilhetagem_transacao_tratamento.name + " ", + now_time=timestamp, + ) + + LABELS = get_current_flow_labels() + + # Captura + runs_captura = create_flow_run.map( + flow_name=unmapped(bilhetagem_auxiliar_captura.name), + project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), + parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value, + labels=unmapped(LABELS), + ) + + wait_captura = wait_for_flow_run.map( + runs_captura, + stream_states=unmapped(True), + stream_logs=unmapped(True), + raise_final_state=unmapped(True), + ) + + # Materialização + run_materializacao = create_flow_run( + flow_name=bilhetagem_materializacao.name, + project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value, + labels=LABELS, + upstream_tasks=[wait_captura], + ) + + wait_materializacao = wait_for_flow_run( + run_materializacao, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) -bilhetagem_principal_captura = deepcopy(default_capture_flow) -bilhetagem_principal_captura.name = "SMTR: Bilhetagem Principal (captura)" -bilhetagem_principal_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) -bilhetagem_principal_captura.run_config = KubernetesRun( +bilhetagem_transacao_tratamento.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +bilhetagem_transacao_tratamento.run_config = KubernetesRun( image=emd_constants.DOCKER_IMAGE.value, labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], ) -bilhetagem_principal_captura.schedule = bilhetagem_principal_schedule +bilhetagem_transacao_tratamento.schedule = every_hour +# bilhetagem_materializacao.schedule = bilhetagem_materializacao_schedule diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/schedules.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/schedules.py index 38fca85a9..21e13f05b 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/schedules.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/schedules.py @@ -15,27 +15,18 @@ generate_execute_schedules, ) -bilhetagem_principal_clocks = generate_execute_schedules( - interval=timedelta(days=1), - labels=[ - emd_constants.RJ_SMTR_AGENT_LABEL.value, - ], - table_parameters=constants.BILHETAGEM_TABLES_PARAMS.value, - dataset_id=constants.BILHETAGEM_DATASET_ID.value, - secret_path=constants.BILHETAGEM_SECRET_PATH.value, - runs_interval_minutes=15, -) - -bilhetagem_principal_schedule = Schedule(clocks=untuple(bilhetagem_principal_clocks)) - +BILHETAGEM_TRANSACAO_INTERVAL = timedelta(minutes=1) bilhetagem_transacao_clocks = generate_execute_schedules( - interval=timedelta(minutes=1), + clock_interval=timedelta( + **constants.BILHETAGEM_CAPTURE_RUN_INTERVAL.value["transacao_run_interval"] + ), labels=[ emd_constants.RJ_SMTR_AGENT_LABEL.value, ], - table_parameters=constants.BILHETAGEM_TRANSACAO_TABLE_PARAMS.value, + table_parameters=constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value, dataset_id=constants.BILHETAGEM_DATASET_ID.value, secret_path=constants.BILHETAGEM_SECRET_PATH.value, + source_type=constants.BILHETAGEM_GENERAL_CAPTURE_PARAMS.value["source_type"], runs_interval_minutes=0, ) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_stpl_gps/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_stpl_gps/flows.py index 615b9b11f..7d8cf1574 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_stpl_gps/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_stpl_gps/flows.py @@ -106,5 +106,5 @@ image=emd_constants.DOCKER_IMAGE.value, labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], ) -# Seguindo o padrão de captura adotado pelo BRT -captura_stpl.schedule = every_minute +# Captura descontinuada (sem dados), avaliar quando voltar +# captura_stpl.schedule = every_minute diff --git a/pipelines/rj_smtr/constants.py b/pipelines/rj_smtr/constants.py index 7133b8abe..0037c6989 100644 --- a/pipelines/rj_smtr/constants.py +++ b/pipelines/rj_smtr/constants.py @@ -165,9 +165,35 @@ class constants(Enum): # pylint: disable=c0103 # BILHETAGEM BILHETAGEM_DATASET_ID = "br_rj_riodejaneiro_bilhetagem" - BILHETAGEM_TRANSACAO_TABLE_PARAMS = [ - { - "table_id": "transacao", + + BILHETAGEM_GENERAL_CAPTURE_PARAMS = { + "databases": { + "principal_db": { + "engine": "mysql", + "host": "principal-database-replica.internal", + }, + "tarifa_db": { + "engine": "postgres", + "host": "tarifa-database-replica.internal", + }, + "transacao_db": { + "engine": "postgres", + "host": "transacao-database-replica.internal", + }, + }, + "vpn_url": "http://vpn-jae.mobilidade.rio/", + "source_type": "api-json", + } + + BILHETAGEM_CAPTURE_RUN_INTERVAL = { + "transacao_run_interval": {"minutes": 1}, + "principal_run_interval": {"hours": 1}, + } + + BILHETAGEM_TRANSACAO_CAPTURE_PARAMS = { + "table_id": "transacao", + "partition_date_only": False, + "extract_params": { "database": "transacao_db", "query": """ SELECT @@ -177,80 +203,104 @@ class constants(Enum): # pylint: disable=c0103 WHERE data_processamento BETWEEN '{start}' AND '{end}' - ORDER BY - data_processamento """, - "primary_key": ["id"], # id column to nest data on - "flag_date_partition": False, + "run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL["transacao_run_interval"], }, - ] - BILHETAGEM_TABLES_PARAMS = [ + "primary_key": ["id"], # id column to nest data on + } + + BILHETAGEM_SECRET_PATH = "smtr_jae_access_data" + + BILHETAGEM_CAPTURE_PARAMS = [ { "table_id": "linha", - "database": "principal_db", - "query": """ - SELECT - * - FROM - LINHA - WHERE - DT_INCLUSAO >= '{start}' - ORDER BY - DT_INCLUSAO - """, + "partition_date_only": True, + "extract_params": { + "database": "principal_db", + "query": """ + SELECT + * + FROM + LINHA + WHERE + DT_INCLUSAO >= '{start}' + """, + "run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL[ + "principal_run_interval" + ], + }, "primary_key": ["CD_LINHA"], # id column to nest data on - "flag_date_partition": True, }, { "table_id": "grupo", - "database": "principal_db", - "query": """ - SELECT - * - FROM - GRUPO - WHERE - DT_INCLUSAO >= '{start}' - ORDER BY - DT_INCLUSAO - """, - "primary_key": ["CD_GRUPO"], - "flag_date_partition": True, + "partition_date_only": True, + "extract_params": { + "database": "principal_db", + "query": """ + SELECT + * + FROM + GRUPO + WHERE + DT_INCLUSAO >= '{start}' + """, + "run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL[ + "principal_run_interval" + ], + }, + "primary_key": ["CD_GRUPO"], # id column to nest data on }, { "table_id": "grupo_linha", - "database": "principal_db", - "query": """ - SELECT - * - FROM - GRUPO_LINHA - WHERE - DT_INCLUSAO >= '{start}' - ORDER BY - DT_INCLUSAO - """, + "partition_date_only": True, + "extract_params": { + "database": "principal_db", + "query": """ + SELECT + * + FROM + GRUPO_LINHA + WHERE + DT_INCLUSAO >= '{start}' + """, + "run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL[ + "principal_run_interval" + ], + }, "primary_key": ["CD_GRUPO", "CD_LINHA"], # id column to nest data on - "flag_date_partition": True, }, { "table_id": "matriz_integracao", - "database": "tarifa_db", - "query": """ - SELECT - * - FROM - matriz_integracao - WHERE - dt_inclusao >= '{start}' - ORDER BY - dt_inclusao - """, + "partition_date_only": True, + "extract_params": { + "database": "tarifa_db", + "query": """ + SELECT + * + FROM + matriz_integracao + WHERE + dt_inclusao >= '{start}' + """, + "run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL[ + "principal_run_interval" + ], + }, "primary_key": [ "cd_versao_matriz", "cd_integracao", ], # id column to nest data on - "flag_date_partition": True, }, ] - BILHETAGEM_SECRET_PATH = "smtr_jae_access_data" + + BILHETAGEM_MATERIALIZACAO_PARAMS = { + "table_id": BILHETAGEM_TRANSACAO_CAPTURE_PARAMS["table_id"], + "upstream": True, + "dbt_vars": { + "date_range": { + "table_run_datetime_column_name": "datetime_transacao", + "delay_hours": 1, + }, + "version": {}, + }, + } diff --git a/pipelines/rj_smtr/flows.py b/pipelines/rj_smtr/flows.py index f1d29ed10..d4292129c 100644 --- a/pipelines/rj_smtr/flows.py +++ b/pipelines/rj_smtr/flows.py @@ -6,7 +6,7 @@ from prefect.run_configs import KubernetesRun from prefect.storage import GCS from prefect import case, Parameter -from prefect.tasks.control_flow import merge +from prefect.utilities.edges import unmapped # EMD Imports # @@ -14,107 +14,108 @@ from pipelines.utils.decorators import Flow from pipelines.utils.tasks import ( rename_current_flow_run_now_time, + get_now_time, + get_current_flow_labels, + get_current_flow_mode, ) +from pipelines.utils.execute_dbt_model.tasks import get_k8s_dbt_client # SMTR Imports # from pipelines.rj_smtr.tasks import ( - create_date_partition, create_date_hour_partition, create_local_partition_path, get_current_timestamp, - get_raw, parse_timestamp_to_string, - save_raw_local, - save_treated_local, - upload_logs_to_bq, - bq_upload, - transform_to_nested_structure, -) - -from pipelines.rj_smtr.tasks import ( + transform_raw_to_nested_structure, + create_dbt_run_vars, + set_last_run_timestamp, + coalesce_task, + upload_raw_data_to_gcs, + upload_staging_data_to_gcs, + get_raw_from_sources, create_request_params, - get_datetime_range, ) +from pipelines.utils.execute_dbt_model.tasks import run_dbt_model with Flow( "SMTR: Captura", code_owners=["caio", "fernanda", "boris", "rodrigo"], ) as default_capture_flow: - # SETUP # + # Configuração # - table_params = Parameter("table_params", default=None) - timestamp_param = Parameter("timestamp", default=None) - interval = Parameter("interval", default=None) + table_id = Parameter("table_id", default=None) + partition_date_only = Parameter("partition_date_only", default=None) + extract_params = Parameter("extract_params", default=None) dataset_id = Parameter("dataset_id", default=None) secret_path = Parameter("secret_path", default=None) + primary_key = Parameter("primary_key", default=None) + source_type = Parameter("source_type", default=None) - timestamp = get_current_timestamp(timestamp_param) - - datetime_range = get_datetime_range(timestamp, interval=interval) + timestamp = get_current_timestamp() rename_flow_run = rename_current_flow_run_now_time( - prefix=default_capture_flow.name + " " + table_params["table_id"] + ": ", + prefix=default_capture_flow.name + " " + table_id + ": ", now_time=timestamp, ) - request_params, request_url = create_request_params( - datetime_range=datetime_range, - table_params=table_params, - secret_path=secret_path, - dataset_id=dataset_id, + partitions = create_date_hour_partition( + timestamp, partition_date_only=partition_date_only ) - with case(table_params["flag_date_partition"], True): - date_partitions = create_date_partition(timestamp) - - with case(table_params["flag_date_partition"], False): - date_hour_partitions = create_date_hour_partition(timestamp) - - partitions = merge(date_partitions, date_hour_partitions) - filename = parse_timestamp_to_string(timestamp) filepath = create_local_partition_path( dataset_id=dataset_id, - table_id=table_params["table_id"], + table_id=table_id, filename=filename, partitions=partitions, ) - raw_status = get_raw( - url=request_url, - headers=secret_path, - params=request_params, + # Extração # + request_params, request_path = create_request_params( + dataset_id=dataset_id, + extract_params=extract_params, + table_id=table_id, + timestamp=timestamp, ) - raw_filepath = save_raw_local(status=raw_status, file_path=filepath) + error, raw_filepath = get_raw_from_sources( + source_type=source_type, + local_filepath=filepath, + source_path=request_path, + dataset_id=dataset_id, + table_id=table_id, + secret_path=secret_path, + request_params=request_params, + ) - # TREAT & CLEAN # - treated_status = transform_to_nested_structure( - status=raw_status, - timestamp=timestamp, - primary_key=table_params["primary_key"], + error = upload_raw_data_to_gcs( + error=error, + raw_filepath=raw_filepath, + table_id=table_id, + dataset_id=dataset_id, + partitions=partitions, ) - treated_filepath = save_treated_local(status=treated_status, file_path=filepath) + # Pré-tratamento # - # LOAD # - error = bq_upload( - dataset_id=dataset_id, - table_id=table_params["table_id"], - filepath=treated_filepath, + error, staging_filepath = transform_raw_to_nested_structure( raw_filepath=raw_filepath, - partitions=partitions, - status=treated_status, + filepath=filepath, + error=error, + timestamp=timestamp, + primary_key=primary_key, ) - upload_logs_to_bq( - dataset_id=dataset_id, - parent_table_id=table_params["table_id"], + STAGING_UPLOADED = upload_staging_data_to_gcs( error=error, + staging_filepath=staging_filepath, timestamp=timestamp, + table_id=table_id, + dataset_id=dataset_id, + partitions=partitions, ) default_capture_flow.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) @@ -122,3 +123,72 @@ image=emd_constants.DOCKER_IMAGE.value, labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], ) + +with Flow( + "SMTR: Materialização", + code_owners=["caio", "fernanda", "boris", "rodrigo"], +) as default_materialization_flow: + # SETUP # + + dataset_id = Parameter("dataset_id", default=None) + table_id = Parameter("table_id", default=None) + raw_table_id = Parameter("raw_table_id", default=None) + dbt_alias = Parameter("dbt_alias", default=False) + upstream = Parameter("upstream", default=None) + downstream = Parameter("downstream", default=None) + exclude = Parameter("exclude", default=None) + flags = Parameter("flags", default=None) + dbt_vars = Parameter("dbt_vars", default=dict()) + + LABELS = get_current_flow_labels() + MODE = get_current_flow_mode(LABELS) + + _vars, date_var, flag_date_range = create_dbt_run_vars( + dataset_id=dataset_id, + dbt_vars=dbt_vars, + table_id=table_id, + raw_dataset_id=dataset_id, + raw_table_id=raw_table_id, + mode=MODE, + ) + + # Rename flow run + + flow_name_prefix = coalesce_task([table_id, dataset_id]) + + flow_name_now_time = coalesce_task([date_var, get_now_time()]) + + rename_flow_run = rename_current_flow_run_now_time( + prefix=default_materialization_flow.name + " " + flow_name_prefix + ": ", + now_time=flow_name_now_time, + ) + + dbt_client = get_k8s_dbt_client(mode=MODE, wait=rename_flow_run) + + RUNS = run_dbt_model.map( + dbt_client=unmapped(dbt_client), + dataset_id=unmapped(dataset_id), + table_id=unmapped(table_id), + _vars=_vars, + dbt_alias=unmapped(dbt_alias), + upstream=unmapped(upstream), + downstream=unmapped(downstream), + exclude=unmapped(exclude), + flags=unmapped(flags), + ) + + with case(flag_date_range, True): + set_last_run_timestamp( + dataset_id=dataset_id, + table_id=table_id, + timestamp=date_var["date_range_end"], + wait=RUNS, + mode=MODE, + ) + + +default_materialization_flow.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +default_materialization_flow.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], +) diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index de52c03df..f7d687dea 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -8,7 +8,7 @@ import os from pathlib import Path import traceback -from typing import Dict, List +from typing import Dict, List, Union, Iterable import io from basedosdados import Storage, Table @@ -28,6 +28,14 @@ get_last_run_timestamp, log_critical, data_info_str, + dict_contains_keys, + get_raw_data_api, + get_raw_data_gcs, + upload_run_logs_to_bq, + get_datetime_range, + read_raw_data, + save_treated_local_func, + save_raw_local_func, ) from pipelines.utils.execute_dbt_model.utils import get_dbt_client from pipelines.utils.utils import log, get_redis_client, get_vault_secret @@ -158,19 +166,23 @@ def get_current_timestamp(timestamp=None, truncate_minute: bool = True) -> datet @task -def create_date_hour_partition(timestamp: datetime) -> str: - """ - Get date hour Hive partition structure from timestamp. +def create_date_hour_partition( + timestamp: datetime, partition_date_only: bool = False +) -> str: """ - return f"data={timestamp.strftime('%Y-%m-%d')}/hora={timestamp.strftime('%H')}" + Create a date (and hour) Hive partition structure from timestamp. + Args: + timestamp (datetime): timestamp to be used as reference + partition_date_only (bool, optional): whether to add hour partition or not -@task -def create_date_partition(timestamp: datetime) -> str: - """ - Get date hour Hive partition structure from timestamp. + Returns: + str: partition string """ - return f"data={timestamp.date()}" + partition = f"data={timestamp.strftime('%Y-%m-%d')}" + if not partition_date_only: + partition += f"/hora={timestamp.strftime('%H')}" + return partition @task @@ -181,34 +193,6 @@ def parse_timestamp_to_string(timestamp: datetime, pattern="%Y-%m-%d-%H-%M-%S") return timestamp.strftime(pattern) -@task -def create_current_date_hour_partition(capture_time=None): - """Create partitioned directory structure to save data locally based - on capture time. - - Args: - capture_time(pendulum.datetime.DateTime, optional): - if recapturing data, will create partitions based - on the failed timestamps being recaptured - - Returns: - dict: "filename" contains the name which to upload the csv, "partitions" contains - the partitioned directory path - """ - if capture_time is None: - capture_time = datetime.now(tz=constants.TIMEZONE.value).replace( - minute=0, second=0, microsecond=0 - ) - date = capture_time.strftime("%Y-%m-%d") - hour = capture_time.strftime("%H") - - return { - "filename": capture_time.strftime("%Y-%m-%d-%H-%M-%S"), - "partitions": f"data={date}/hora={hour}", - "timestamp": capture_time, - } - - @task def create_local_partition_path( dataset_id: str, table_id: str, filename: str, partitions: str = None @@ -448,15 +432,123 @@ def get_raw( # pylint: disable=R0912 "Unsupported raw file extension. Supported only: json, csv and txt" ) - except Exception as exp: - error = exp - - if error is not None: + except Exception: + error = traceback.format_exc() log(f"[CATCHED] Task failed with error: \n{error}", level="error") return {"data": data, "error": error} +@task(checkpoint=False, nout=2) +def create_request_params( + extract_params: dict, + table_id: str, + dataset_id: str, + timestamp: datetime, +) -> tuple[str, str]: + """ + Task to create request params + + Args: + extract_params (dict): extract parameters + table_id (str): table_id on BigQuery + dataset_id (str): dataset_id on BigQuery + timestamp (datetime): timestamp for flow run + + Returns: + request_params: host, database and query to request data + request_url: url to request data + """ + request_params = None + request_url = None + + if dataset_id == constants.BILHETAGEM_DATASET_ID.value: + database = constants.BILHETAGEM_GENERAL_CAPTURE_PARAMS.value["databases"][ + extract_params["database"] + ] + request_url = ( + constants.BILHETAGEM_GENERAL_CAPTURE_PARAMS.value["vpn_url"] + + database["engine"] + ) + + datetime_range = get_datetime_range( + timestamp=timestamp, interval=timedelta(**extract_params["run_interval"]) + ) + + request_params = { + "host": database["host"], # TODO: exibir no log em ambiente fechado + "database": extract_params["database"], + "query": extract_params["query"].format(**datetime_range), + } + + return request_params, request_url + + +@task(checkpoint=False, nout=2) +def get_raw_from_sources( + source_type: str, + local_filepath: str, + source_path: str = None, + dataset_id: str = None, + table_id: str = None, + secret_path: str = None, + request_params: dict = None, +) -> tuple[str, str]: + """ + Task to get raw data from sources + + Args: + source_type (str): source type + local_filepath (str): local filepath + source_path (str, optional): source path. Defaults to None. + dataset_id (str, optional): dataset_id on BigQuery. Defaults to None. + table_id (str, optional): table_id on BigQuery. Defaults to None. + secret_path (str, optional): secret path. Defaults to None. + request_params (dict, optional): request parameters. Defaults to None. + + Returns: + error: error catched from upstream tasks + filepath: filepath to raw data + """ + error = None + filepath = None + data = None + + source_values = source_type.split("-", 1) + + source_type, filetype = ( + source_values if len(source_values) == 2 else (source_values[0], None) + ) + + log(f"Getting raw data from source type: {source_type}") + + try: + if source_type == "api": + error, data, filetype = get_raw_data_api( + url=source_path, + secret_path=secret_path, + api_params=request_params, + filetype=filetype, + ) + elif source_type == "gcs": + error, data, filetype = get_raw_data_gcs( + dataset_id=dataset_id, table_id=table_id, zip_filename=request_params + ) + else: + raise NotImplementedError(f"{source_type} not supported") + + filepath = save_raw_local_func( + data=data, filepath=local_filepath, filetype=filetype + ) + + except NotImplementedError: + error = traceback.format_exc() + log(f"[CATCHED] Task failed with error: \n{error}", level="error") + + log(f"Raw extraction ended returned values: {error}, {filepath}") + return error, filepath + + ############### # # Load data @@ -502,11 +594,8 @@ def bq_upload( if status["error"] is not None: return status["error"] - if len(status["data"]) == 0: - log("Empty dataframe, skipping upload") - return None - error = None + try: # Upload raw to staging if raw_filepath: @@ -633,6 +722,101 @@ def upload_logs_to_bq( # pylint: disable=R0913 raise Exception(f"Pipeline failed with error: {error}") +@task +def upload_raw_data_to_gcs( + error: str, + raw_filepath: str, + table_id: str, + dataset_id: str, + partitions: list, +) -> Union[str, None]: + """ + Upload raw data to GCS. + + Args: + error (str): Error catched from upstream tasks. + raw_filepath (str): Path to the saved raw .json file + table_id (str): table_id on BigQuery + dataset_id (str): dataset_id on BigQuery + partitions (list): list of partition strings + + Returns: + Union[str, None]: if there is an error returns it traceback, otherwise returns None + """ + if error is None: + try: + st_obj = Storage(table_id=table_id, dataset_id=dataset_id) + log( + f"""Uploading raw file to bucket {st_obj.bucket_name} at + {st_obj.bucket_name}/{dataset_id}/{table_id}""" + ) + st_obj.upload( + path=raw_filepath, + partitions=partitions, + mode="raw", + if_exists="replace", + ) + except Exception: + error = traceback.format_exc() + log(f"[CATCHED] Task failed with error: \n{error}", level="error") + + return error + + +@task +def upload_staging_data_to_gcs( + error: str, + staging_filepath: str, + timestamp: datetime, + table_id: str, + dataset_id: str, + partitions: list, +) -> Union[str, None]: + """ + Upload staging data to GCS. + + Args: + error (str): Error catched from upstream tasks. + staging_filepath (str): Path to the saved treated .csv file. + timestamp (datetime): timestamp for flow run. + table_id (str): table_id on BigQuery. + dataset_id (str): dataset_id on BigQuery. + partitions (list): list of partition strings. + + Returns: + Union[str, None]: if there is an error returns it traceback, otherwise returns None + """ + if error is None: + try: + # Creates and publish table if it does not exist, append to it otherwise + create_or_append_table( + dataset_id=dataset_id, + table_id=table_id, + path=staging_filepath, + partitions=partitions, + ) + except Exception: + error = traceback.format_exc() + log(f"[CATCHED] Task failed with error: \n{error}", level="error") + + upload_run_logs_to_bq( + dataset_id=dataset_id, + parent_table_id=table_id, + error=error, + timestamp=timestamp, + mode="staging", + ) + + return error + + +############### +# +# Daterange tasks +# +############### + + @task( checkpoint=False, max_retries=constants.MAX_RETRIES.value, @@ -823,140 +1007,206 @@ def get_previous_date(days): return now.to_date_string() -@task -def transform_to_nested_structure( - status: dict, timestamp: datetime, primary_key: list = None -): - """Transform dataframe to nested structure +############### +# +# Pretreat data +# +############### - Args: - status (dict): Must contain keys - * `data`: dataframe returned from treatement - * `error`: error catched from data treatement - timestamp (datetime): timestamp of the capture - primary_key (list, optional): List of primary keys to be used for nesting. - Returns: - dict: Conatining keys - * `data` (json): nested data - * `error` (str): catched error, if any. Otherwise, returns None +@task(nout=2) +def transform_raw_to_nested_structure( + raw_filepath: str, + filepath: str, + error: str, + timestamp: datetime, + primary_key: list = None, +) -> tuple[str, str]: """ + Task to transform raw data to nested structure - # Check previous error - if status["error"] is not None: - return {"data": pd.DataFrame(), "error": status["error"]} - - # Check empty dataframe - if len(status["data"]) == 0: - log("Empty dataframe, skipping transformation") - return {"data": pd.DataFrame(), "error": status["error"]} + Args: + raw_filepath (str): Path to the saved raw .json file + filepath (str): Path to the saved treated .csv file + error (str): Error catched from upstream tasks + timestamp (datetime): timestamp for flow run + primary_key (list, optional): Primary key to be used on nested structure - try: - if primary_key is None: - primary_key = [] + Returns: + str: Error traceback + str: Path to the saved treated .csv file + """ + if error is None: + try: + # leitura do dado raw + error, data = read_raw_data(filepath=raw_filepath) - error = None - data = pd.DataFrame(status["data"]) + if primary_key is None: + primary_key = [] - log( - f""" - Received inputs: - - timestamp:\n{timestamp} - - data:\n{data.head()}""" - ) + log( + f""" + Received inputs: + - timestamp:\n{timestamp} + - data:\n{data.head()}""" + ) - log(f"Raw data:\n{data_info_str(data)}", level="info") + # Check empty dataframe + if data.empty: + log("Empty dataframe, skipping transformation...") + else: + log(f"Raw data:\n{data_info_str(data)}", level="info") + + log("Adding captured timestamp column...", level="info") + data["timestamp_captura"] = timestamp + + log("Striping string columns...", level="info") + for col in data.columns[data.dtypes == "object"].to_list(): + data[col] = data[col].str.strip() + + log(f"Finished cleaning! Data:\n{data_info_str(data)}", level="info") + + log("Creating nested structure...", level="info") + pk_cols = primary_key + ["timestamp_captura"] + data = ( + data.groupby(pk_cols) + .apply( + lambda x: x[data.columns.difference(pk_cols)].to_json( + orient="records" + ) + ) + .str.strip("[]") + .reset_index(name="content")[ + primary_key + ["content", "timestamp_captura"] + ] + ) - log("Adding captured timestamp column...", level="info") - data["timestamp_captura"] = timestamp + log( + f"Finished nested structure! Data:\n{data_info_str(data)}", + level="info", + ) - log("Striping string columns...", level="info") - for col in data.columns[data.dtypes == "object"].to_list(): - data[col] = data[col].str.strip() + # save treated local + filepath = save_treated_local_func( + data=data, error=error, filepath=filepath + ) - log(f"Finished cleaning! Data:\n{data_info_str(data)}", level="info") + except Exception: # pylint: disable=W0703 + error = traceback.format_exc() + log(f"[CATCHED] Task failed with error: \n{error}", level="error") - log("Creating nested structure...", level="info") - pk_cols = primary_key + ["timestamp_captura"] - data = ( - data.groupby(pk_cols) - .apply( - lambda x: x[data.columns.difference(pk_cols)].to_json(orient="records") - ) - .str.strip("[]") - .reset_index(name="content")[primary_key + ["content", "timestamp_captura"]] - ) + return error, filepath - log( - f"Finished nested structure! Data:\n{data_info_str(data)}", - level="info", - ) - except Exception as exp: # pylint: disable=W0703 - error = exp +@task(checkpoint=False) +def coalesce_task(value_list: Iterable): + """ + Task to get the first non None value of a list - if error is not None: - log(f"[CATCHED] Task failed with error: \n{error}", level="error") + Args: + value_list (Iterable): a iterable object with the values + Returns: + any: value_list's first non None item + """ - return {"data": data, "error": error} + try: + return next(value for value in value_list if value is not None) + except StopIteration: + return -@task(checkpoint=False) -def get_datetime_range( - timestamp: datetime, - interval: int, -) -> dict: +@task(checkpoint=False, nout=3) +def create_dbt_run_vars( + dataset_id: str, + dbt_vars: dict, + table_id: str, + raw_dataset_id: str, + raw_table_id: str, + mode: str, +) -> tuple[list[dict], Union[list[dict], dict, None], bool]: """ - Task to get datetime range in UTC + Create the variables to be used in dbt materialization based on a dict Args: - timestamp (datetime): timestamp to get datetime range - interval (int): interval in seconds + dataset_id (str): the dataset_id to get the variables + dbt_vars (dict): dict containing the parameters + table_id (str): the table_id get the date_range variable + raw_dataset_id (str): the raw_dataset_id get the date_range variable + raw_table_id (str): the raw_table_id get the date_range variable + mode (str): the mode to get the date_range variable Returns: - dict: datetime range + tuple[list[dict]: the variables to be used in DBT + Union[list[dict], dict, None]: the date variable (date_range or run_date) + bool: a flag that indicates if the date_range variable came from Redis """ - start = ( - (timestamp - timedelta(seconds=interval)) - .astimezone(tz=timezone("UTC")) - .strftime("%Y-%m-%d %H:%M:%S") - ) + log(f"Creating DBT variables. Parameter received: {dbt_vars}") - end = timestamp.astimezone(tz=timezone("UTC")).strftime("%Y-%m-%d %H:%M:%S") + if (not dbt_vars) or (not table_id): + log("dbt_vars or table_id are blank. Skiping task") + return [None], None, False - return {"start": start, "end": end} + final_vars = [] + date_var = None + flag_date_range = False + if "date_range" in dbt_vars.keys(): + log("Creating date_range variable") -@task(checkpoint=False, nout=2) -def create_request_params( - datetime_range: dict, table_params: dict, secret_path: str, dataset_id: str -) -> tuple: - """ - Task to create request params + # Set date_range variable manually + if dict_contains_keys( + dbt_vars["date_range"], ["date_range_start", "date_range_end"] + ): + date_var = { + "date_range_start": dbt_vars["date_range"]["date_range_start"], + "date_range_end": dbt_vars["date_range"]["date_range_end"], + } + # Create date_range using Redis + else: + raw_table_id = raw_table_id or table_id - Args: - datetime_range (dict): datetime range to get params - table_params (dict): table params to get params - secret_path (str): secret path to get params - dataset_id (str): dataset id to get params + date_var = get_materialization_date_range.run( + dataset_id=dataset_id, + table_id=table_id, + raw_dataset_id=raw_dataset_id, + raw_table_id=raw_table_id, + table_run_datetime_column_name=dbt_vars["date_range"].get( + "table_run_datetime_column_name" + ), + mode=mode, + delay_hours=dbt_vars["date_range"].get("delay_hours", 0), + ) - Returns: - request_params: host, database and query to request data - request_url: url to request data - """ + flag_date_range = True - if dataset_id == constants.BILHETAGEM_DATASET_ID.value: - secrets = get_vault_secret(secret_path)["data"] + final_vars.append(date_var.copy()) - database_secrets = secrets["databases"][table_params["database"]] + log(f"date_range created: {date_var}") - request_url = secrets["vpn_url"] + database_secrets["engine"] + elif "run_date" in dbt_vars.keys(): + log("Creating run_date variable") - request_params = { - "host": database_secrets["host"], # TODO: exibir no log em ambiente fechado - "database": table_params["database"], - "query": table_params["query"].format(**datetime_range), - } + date_var = get_run_dates.run( + dbt_vars["run_date"].get("date_range_start"), + dbt_vars["run_date"].get("date_range_end"), + ) + final_vars.append([d.copy() for d in date_var]) - return request_params, request_url + log(f"run_date created: {date_var}") + + if "version" in dbt_vars.keys(): + log("Creating version variable") + dataset_sha = fetch_dataset_sha.run(dataset_id=dataset_id) + + # if there are other variables inside the list, update each item adding the version variable + if final_vars: + final_vars = get_join_dict.run(dict_list=final_vars, new_dict=dataset_sha) + else: + final_vars.append(dataset_sha) + + log(f"version created: {dataset_sha}") + + log(f"All variables was created, final value is: {final_vars}") + + return final_vars, date_var, flag_date_range diff --git a/pipelines/rj_smtr/utils.py b/pipelines/rj_smtr/utils.py index 9ddf7d687..f9b98afab 100644 --- a/pipelines/rj_smtr/utils.py +++ b/pipelines/rj_smtr/utils.py @@ -8,12 +8,18 @@ from pathlib import Path from datetime import timedelta, datetime -from typing import List +from typing import List, Union +import traceback import io +import json +import zipfile +import pytz +import requests import basedosdados as bd from basedosdados import Table import pandas as pd -import pytz +from google.cloud.storage.blob import Blob + from prefect.schedules.clocks import IntervalClock @@ -398,46 +404,40 @@ def data_info_str(data: pd.DataFrame): def generate_execute_schedules( # pylint: disable=too-many-arguments,too-many-locals - interval: timedelta, + clock_interval: timedelta, labels: List[str], - table_parameters: list, - dataset_id: str, - secret_path: str, + table_parameters: Union[list[dict], dict], runs_interval_minutes: int = 15, start_date: datetime = datetime( 2020, 1, 1, tzinfo=pytz.timezone(emd_constants.DEFAULT_TIMEZONE.value) ), + **general_flow_params, ) -> List[IntervalClock]: """ Generates multiple schedules Args: - interval (timedelta): The interval to run the schedule + clock_interval (timedelta): The interval to run the schedule labels (List[str]): The labels to be added to the schedule - table_parameters (list): The table parameters - dataset_id (str): The dataset_id to be used in the schedule - secret_path (str): The secret path to be used in the schedule + table_parameters (list): The table parameters to iterate over runs_interval_minutes (int, optional): The interval between each schedule. Defaults to 15. start_date (datetime, optional): The start date of the schedule. Defaults to datetime(2020, 1, 1, tzinfo=pytz.timezone(emd_constants.DEFAULT_TIMEZONE.value)). - + general_flow_params: Any param that you want to pass to the flow Returns: List[IntervalClock]: The list of schedules """ + if isinstance(table_parameters, dict): + table_parameters = [table_parameters] clocks = [] for count, parameters in enumerate(table_parameters): - parameter_defaults = { - "table_params": parameters, - "dataset_id": dataset_id, - "secret_path": secret_path, - "interval": interval.total_seconds(), - } + parameter_defaults = parameters | general_flow_params log(f"parameter_defaults: {parameter_defaults}") clocks.append( IntervalClock( - interval=interval, + interval=clock_interval, start_date=start_date + timedelta(minutes=runs_interval_minutes * count), labels=labels, @@ -445,3 +445,317 @@ def generate_execute_schedules( # pylint: disable=too-many-arguments,too-many-l ) ) return clocks + + +def dict_contains_keys(input_dict: dict, keys: list[str]) -> bool: + """ + Test if the input dict has all keys present in the list + + Args: + input_dict (dict): the dict to test if has the keys + keys (list[str]): the list containing the keys to check + Returns: + bool: True if the input_dict has all the keys otherwise False + """ + return all(x in input_dict.keys() for x in keys) + + +def save_raw_local_func( + data: Union[dict, str], filepath: str, mode: str = "raw", filetype: str = "json" +) -> str: + """ + Saves json response from API to .json file. + Args: + filepath (str): Path which to save raw file + status (dict): Must contain keys + * data: json returned from API + * error: error catched from API request + mode (str, optional): Folder to save locally, later folder which to upload to GCS. + Returns: + str: Path to the saved file + """ + + # diferentes tipos de arquivos para salvar + _filepath = filepath.format(mode=mode, filetype=filetype) + Path(_filepath).parent.mkdir(parents=True, exist_ok=True) + + if filetype == "json": + if isinstance(data, dict): + data = json.loads(data) + json.dump(data, Path(_filepath).open("w", encoding="utf-8")) + + # if filetype == "csv": + # pass + if filetype in ("txt", "csv"): + with open(_filepath, "w", encoding="utf-8") as file: + file.write(data) + + log(f"Raw data saved to: {_filepath}") + return _filepath + + +def get_raw_data_api( # pylint: disable=R0912 + url: str, + secret_path: str = None, + api_params: dict = None, + filetype: str = None, +) -> tuple[str, str, str]: + """ + Request data from URL API + + Args: + url (str): URL to request data + secret_path (str, optional): Secret path to get headers. Defaults to None. + api_params (dict, optional): Parameters to pass to API. Defaults to None. + filetype (str, optional): Filetype to save raw file. Defaults to None. + + Returns: + tuple[str, str, str]: Error, data and filetype + """ + error = None + data = None + try: + if secret_path is None: + headers = secret_path + else: + headers = get_vault_secret(secret_path)["data"] + + response = requests.get( + url, + headers=headers, + timeout=constants.MAX_TIMEOUT_SECONDS.value, + params=api_params, + ) + + response.raise_for_status() + + if filetype == "json": + data = response.json() + else: + data = response.text + + except Exception: + error = traceback.format_exc() + log(f"[CATCHED] Task failed with error: \n{error}", level="error") + + return error, data, filetype + + +def get_upload_storage_blob( + dataset_id: str, + filename: str, +) -> Blob: + """ + Get a blob from upload zone in storage + + Args: + dataset_id (str): The dataset id on BigQuery. + filename (str): The filename in GCS. + + Returns: + Blob: blob object + """ + bucket = bd.Storage(dataset_id="", table_id="") + blob_list = list( + bucket.client["storage_staging"] + .bucket(bucket.bucket_name) + .list_blobs(prefix=f"upload/{dataset_id}/{filename}.") + ) + return blob_list[0] + + +def get_raw_data_gcs( + dataset_id: str, + table_id: str, + zip_filename: str = None, +) -> tuple[str, str, str]: + """ + Get raw data from GCS + + Args: + dataset_id (str): The dataset id on BigQuery. + table_id (str): The table id on BigQuery. + zip_filename (str, optional): The zip file name. Defaults to None. + + Returns: + tuple[str, str, str]: Error, data and filetype + """ + error = None + data = None + filetype = None + + try: + blob_search_name = zip_filename or table_id + blob = get_upload_storage_blob(dataset_id=dataset_id, filename=blob_search_name) + + filename = blob.name + filetype = filename.split(".")[-1] + + data = blob.download_as_bytes() + + if filetype == "zip": + with zipfile.ZipFile(io.BytesIO(data), "r") as zipped_file: + filenames = zipped_file.namelist() + filename = list( + filter(lambda x: x.split(".")[0] == table_id, filenames) + )[0] + filetype = filename.split(".")[-1] + data = zipped_file.read(filename) + + data = data.decode(encoding="utf-8") + + except Exception: + error = traceback.format_exc() + log(f"[CATCHED] Task failed with error: \n{error}", level="error") + + return error, data, filetype + + +def save_treated_local_func( + filepath: str, data: pd.DataFrame, error: str, mode: str = "staging" +) -> str: + """ + Save treated file to CSV. + + Args: + filepath (str): Path to save file + data (pd.DataFrame): Dataframe to save + error (str): Error catched during execution + mode (str, optional): Folder to save locally, later folder which to upload to GCS. + + Returns: + str: Path to the saved file + """ + _filepath = filepath.format(mode=mode, filetype="csv") + Path(_filepath).parent.mkdir(parents=True, exist_ok=True) + if error is None: + data.to_csv(_filepath, index=False) + log(f"Treated data saved to: {_filepath}") + return _filepath + + +def upload_run_logs_to_bq( # pylint: disable=R0913 + dataset_id: str, + parent_table_id: str, + timestamp: str, + error: str = None, + previous_error: str = None, + recapture: bool = False, + mode: str = "raw", +): + """ + Upload execution status table to BigQuery. + Table is uploaded to the same dataset, named {parent_table_id}_logs. + If passing status_dict, should not pass timestamp and error. + + Args: + dataset_id (str): dataset_id on BigQuery + parent_table_id (str): table_id on BigQuery + timestamp (str): timestamp to get datetime range + error (str): error catched during execution + previous_error (str): previous error catched during execution + recapture (bool): if the execution was a recapture + mode (str): folder to save locally, later folder which to upload to GCS + + Returns: + None + """ + table_id = parent_table_id + "_logs" + # Create partition directory + filename = f"{table_id}_{timestamp.isoformat()}" + partition = f"data={timestamp.date()}" + filepath = Path( + f"""data/{mode}/{dataset_id}/{table_id}/{partition}/{filename}.csv""" + ) + filepath.parent.mkdir(exist_ok=True, parents=True) + # Create dataframe to be uploaded + if not error and recapture is True: + # if the recapture is succeeded, update the column erro + dataframe = pd.DataFrame( + { + "timestamp_captura": [timestamp], + "sucesso": [True], + "erro": [f"[recapturado]{previous_error}"], + } + ) + log(f"Recapturing {timestamp} with previous error:\n{error}") + else: + # not recapturing or error during flow execution + dataframe = pd.DataFrame( + { + "timestamp_captura": [timestamp], + "sucesso": [error is None], + "erro": [error], + } + ) + # Save data local + dataframe.to_csv(filepath, index=False) + # Upload to Storage + create_or_append_table( + dataset_id=dataset_id, + table_id=table_id, + path=filepath.as_posix(), + partitions=partition, + ) + if error is not None: + raise Exception(f"Pipeline failed with error: {error}") + + +def get_datetime_range( + timestamp: datetime, + interval: timedelta, +) -> dict: + """ + Task to get datetime range in UTC + + Args: + timestamp (datetime): timestamp to get datetime range + interval (timedelta): interval to get datetime range + + Returns: + dict: datetime range + """ + + start = ( + (timestamp - interval) + .astimezone(tz=pytz.timezone("UTC")) + .strftime("%Y-%m-%d %H:%M:%S") + ) + + end = timestamp.astimezone(tz=pytz.timezone("UTC")).strftime("%Y-%m-%d %H:%M:%S") + + return {"start": start, "end": end} + + +def read_raw_data(filepath: str, csv_args: dict = None) -> tuple[str, pd.DataFrame]: + """ + Read raw data from file + + Args: + filepath (str): filepath to read + csv_args (dict): arguments to pass to pandas.read_csv + + Returns: + tuple[str, pd.DataFrame]: error and data + """ + error = None + data = None + try: + file_type = filepath.split(".")[-1] + + if file_type == "json": + data = pd.read_json(filepath) + + # data = json.loads(data) + elif file_type in ("txt", "csv"): + if csv_args is None: + csv_args = {} + data = pd.read_csv(filepath, **csv_args) + else: + error = "Unsupported raw file extension. Supported only: json, csv and txt" + + except Exception: + error = traceback.format_exc() + log(f"[CATCHED] Task failed with error: \n{error}", level="error") + + return error, data diff --git a/pipelines/rj_smtr/veiculo/flows.py b/pipelines/rj_smtr/veiculo/flows.py index 28188a129..e1fab515e 100644 --- a/pipelines/rj_smtr/veiculo/flows.py +++ b/pipelines/rj_smtr/veiculo/flows.py @@ -30,7 +30,7 @@ every_day_hour_seven, ) from pipelines.rj_smtr.tasks import ( - create_date_partition, + create_date_hour_partition, create_local_partition_path, get_current_timestamp, get_raw, @@ -71,7 +71,7 @@ ) # SETUP # - partitions = create_date_partition(timestamp) + partitions = create_date_hour_partition(timestamp, partition_date_only=True) filename = parse_timestamp_to_string(timestamp) @@ -140,7 +140,7 @@ ) # SETUP # - partitions = create_date_partition(timestamp) + partitions = create_date_hour_partition(timestamp, partition_date_only=True) filename = parse_timestamp_to_string(timestamp) diff --git a/pipelines/utils/utils.py b/pipelines/utils/utils.py index 73474d4a7..b155e21b2 100644 --- a/pipelines/utils/utils.py +++ b/pipelines/utils/utils.py @@ -711,16 +711,24 @@ def get_credentials_from_env( return cred -def get_storage_blobs(dataset_id: str, table_id: str) -> list: +def get_storage_blobs(dataset_id: str, table_id: str, mode: str = "staging") -> list: """ Get all blobs from a table in a dataset. + + Args: + dataset_id (str): dataset id + table_id (str): table id + mode (str, optional): mode to use. Defaults to "staging". + + Returns: + list: list of blobs """ bd_storage = bd.Storage(dataset_id=dataset_id, table_id=table_id) return list( bd_storage.client["storage_staging"] .bucket(bd_storage.bucket_name) - .list_blobs(prefix=f"staging/{bd_storage.dataset_id}/{bd_storage.table_id}/") + .list_blobs(prefix=f"{mode}/{bd_storage.dataset_id}/{bd_storage.table_id}/") )