diff --git a/pipelines/rj_cor/comando/eventos/constants.py b/pipelines/rj_cor/comando/eventos/constants.py index e1b8a0635..6fcf8d9f0 100644 --- a/pipelines/rj_cor/comando/eventos/constants.py +++ b/pipelines/rj_cor/comando/eventos/constants.py @@ -17,7 +17,7 @@ class constants(Enum): # pylint: disable=c0103 TABLE_ID_EVENTOS = "ocorrencias_nova_api" REDIS_NAME = "last_update" TABLE_ID_ATIVIDADES_EVENTOS = "ocorrencias_orgaos_responsaveis_nova_api" - # TABLE_ID_POPS = "procedimento_operacional_padrao" + TABLE_ID_POPS = "procedimento_operacional_padrao_nova_api" # TABLE_ID_ATIVIDADES_POPS = "procedimento_operacional_padrao_orgaos_responsaveis" RAIN_DASHBOARD_FLOW_SCHEDULE_PARAMETERS = { "redis_data_key": "data_alagamento_recente_comando", diff --git a/pipelines/rj_cor/comando/eventos/flows.py b/pipelines/rj_cor/comando/eventos/flows.py index c50fffeee..166ae474d 100644 --- a/pipelines/rj_cor/comando/eventos/flows.py +++ b/pipelines/rj_cor/comando/eventos/flows.py @@ -16,14 +16,16 @@ from pipelines.rj_cor.comando.eventos.constants import ( constants as comando_constants, ) -from pipelines.rj_cor.comando.eventos.schedules import every_hour # , every_month +from pipelines.rj_cor.comando.eventos.schedules import every_hour, every_month from pipelines.rj_cor.comando.eventos.tasks import ( download_data_ocorrencias, download_data_atividades, + download_data_pops, get_date_interval, - get_redis_df, + # get_redis_df, get_redis_max_date, save_data, + save_no_partition, save_redis_max_date, treat_data_ocorrencias, treat_data_atividades, @@ -293,35 +295,18 @@ dfr = download_data_atividades(first_date, last_date) - dfr_redis = get_redis_df( + redis_max_date = get_redis_max_date( dataset_id=dataset_id, table_id=table_id, name=redis_name, mode=redis_mode, ) - dfr_treated, dfr_redis = treat_data_atividades( + dfr_treated, redis_max_date = treat_data_atividades( dfr, - dfr_redis=dfr_redis, - columns=["id_evento", "data_inicio", "sigla", "descricao", "status"], + redis_max_date, ) - # dfr = compare_actual_df_with_redis_df( - # dfr, - # dfr_redis=dfr_redis, - # columns=columns, - - # ) - - # save_redis_df( - # dfr_redis, - # dataset_id, - # table_id, - # redis_name, - # keep_n_days=1, - # mode = mode, - # ) - path = save_data(dfr_treated) task_upload = create_table_and_upload_to_gcs( @@ -333,6 +318,15 @@ wait=path, ) + save_redis_max_date( + dataset_id=dataset_id, + table_id=table_id, + name=redis_name, + mode=redis_mode, + redis_max_date=redis_max_date, + wait=task_upload, + ) + # Warning: this task won't execute if we provide a date interval # on parameters. The reason this happens is for if we want to # perform backfills, it won't mess with the Redis interval. @@ -357,12 +351,12 @@ project_name=constants.PREFECT_DEFAULT_PROJECT.value, parameters={ "dataset_id": dataset_id, - "table_id": "ocorrencias_orgaos_responsaveis", # change to table_id + "table_id": table_id, "mode": materialization_mode, "materialize_to_datario": materialize_to_datario, }, labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.ocorrencias_orgaos_responsaveis", + run_name=f"Materialize {dataset_id}.{table_id}", ) materialization_flow.set_upstream(task_upload) @@ -416,198 +410,114 @@ rj_cor_comando_atividades_evento_flow.schedule = every_hour -# with Flow( -# "COR: Comando - POPs e Atividades dos POPs", -# code_owners=[ -# "paty", -# ], -# ) as rj_cor_comando_pops_flow: -# # Dump mode -# dump_mode = Parameter("dump_mode", default="overwrite", required=False) - -# # Materialization parameters -# materialize_after_dump = Parameter( -# "materialize_after_dump", default=False, required=False -# ) -# materialization_mode = Parameter( -# "materialization_mode", default="prod", required=False -# ) -# materialize_to_datario = Parameter( -# "materialize_to_datario", default=False, required=False -# ) - -# # Dump to GCS after? Should only dump to GCS if materializing to datario -# dump_to_gcs = Parameter("dump_to_gcs", default=False, required=False) -# maximum_bytes_processed = Parameter( -# "maximum_bytes_processed", -# required=False, -# default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value, -# ) - -# dataset_id = Parameter( -# "dataset_id", default=comando_constants.DATASET_ID.value, required=False -# ) -# table_id_pops = Parameter( -# "table_id_pops", default=comando_constants.TABLE_ID_POPS.value, required=False -# ) -# table_id_atividades_pops = Parameter( -# "table_id_atividades_pops", -# default=comando_constants.TABLE_ID_ATIVIDADES_POPS.value, -# required=False, -# ) - -# pops = get_pops() -# redis_pops = get_on_redis(dataset_id, table_id_atividades_pops, mode="dev") -# atividades_pops, update_pops_redis = get_atividades_pops( -# pops=pops, redis_pops=redis_pops -# ) -# has_update = not_none(update_pops_redis) - -# path_pops = save_no_partition(dataframe=pops) - -# task_upload_pops = create_table_and_upload_to_gcs( -# data_path=path_pops, -# dataset_id=dataset_id, -# table_id=table_id_pops, -# biglake_table=False, -# dump_mode=dump_mode, -# ) - -# with case(has_update, True): -# path_atividades_pops = save_no_partition( -# dataframe=atividades_pops, append=False -# ) - -# task_upload_atividades_pops = create_table_and_upload_to_gcs( -# data_path=path_atividades_pops, -# dataset_id=dataset_id, -# table_id=table_id_atividades_pops, -# biglake_table=False, -# dump_mode="overwrite", -# ) - -# save_on_redis( -# dataset_id, -# table_id_atividades_pops, -# "dev", -# update_pops_redis, -# wait=task_upload_atividades_pops, -# ) - -# with case(materialize_after_dump, True): -# # Trigger DBT flow run -# current_flow_labels = get_current_flow_labels() - -# materialization_pops_flow = create_flow_run( -# flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, -# project_name=constants.PREFECT_DEFAULT_PROJECT.value, -# parameters={ -# "dataset_id": dataset_id, -# "table_id": table_id_pops, -# "mode": materialization_mode, -# "materialize_to_datario": materialize_to_datario, -# }, -# labels=current_flow_labels, -# run_name=f"Materialize {dataset_id}.{table_id_pops}", -# ) -# materialization_pops_flow.set_upstream(task_upload_pops) - -# materialization_atividades_pops_flow = create_flow_run( -# flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, -# project_name=constants.PREFECT_DEFAULT_PROJECT.value, -# parameters={ -# "dataset_id": dataset_id, -# "table_id": table_id_atividades_pops, -# "mode": materialization_mode, -# "materialize_to_datario": materialize_to_datario, -# }, -# labels=current_flow_labels, -# run_name=f"Materialize {dataset_id}.{table_id_atividades_pops}", -# ) -# materialization_atividades_pops_flow.set_upstream(task_upload_atividades_pops) - -# wait_for_pops_materialization = wait_for_flow_run( -# materialization_pops_flow, -# stream_states=True, -# stream_logs=True, -# raise_final_state=True, -# ) -# wait_for_pops_materialization.max_retries = ( -# dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value -# ) -# wait_for_pops_materialization.retry_delay = timedelta( -# seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value -# ) - -# wait_for_atividades_pops_materialization = wait_for_flow_run( -# materialization_atividades_pops_flow, -# stream_states=True, -# stream_logs=True, -# raise_final_state=True, -# ) -# wait_for_atividades_pops_materialization.max_retries = ( -# dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value -# ) -# wait_for_atividades_pops_materialization.retry_delay = timedelta( -# seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value -# ) - -# with case(dump_to_gcs, True): -# # Trigger Dump to GCS flow run with project id as datario -# dump_pops_to_gcs_flow = create_flow_run( -# flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, -# project_name=constants.PREFECT_DEFAULT_PROJECT.value, -# parameters={ -# "project_id": "datario", -# "dataset_id": dataset_id, -# "table_id": table_id_pops, -# "maximum_bytes_processed": maximum_bytes_processed, -# }, -# labels=[ -# "datario", -# ], -# run_name=f"Dump to GCS {dataset_id}.{table_id_pops}", -# ) -# dump_pops_to_gcs_flow.set_upstream(wait_for_pops_materialization) - -# dump_atividades_pops_to_gcs_flow = create_flow_run( -# flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, -# project_name=constants.PREFECT_DEFAULT_PROJECT.value, -# parameters={ -# "project_id": "datario", -# "dataset_id": dataset_id, -# "table_id": table_id_atividades_pops, -# "maximum_bytes_processed": maximum_bytes_processed, -# }, -# labels=[ -# "datario", -# ], -# run_name=f"Dump to GCS {dataset_id}.{table_id_atividades_pops}", -# ) -# dump_atividades_pops_to_gcs_flow.set_upstream( -# wait_for_atividades_pops_materialization -# ) - -# wait_for_dump_pops_to_gcs = wait_for_flow_run( -# dump_pops_to_gcs_flow, -# stream_states=True, -# stream_logs=True, -# raise_final_state=True, -# ) - -# wait_for_dump_atividades_pops_to_gcs = wait_for_flow_run( -# dump_atividades_pops_to_gcs_flow, -# stream_states=True, -# stream_logs=True, -# raise_final_state=True, -# ) - -# rj_cor_comando_pops_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -# rj_cor_comando_pops_flow.run_config = KubernetesRun( -# image=constants.DOCKER_IMAGE.value, -# labels=[ -# constants.RJ_COR_AGENT_LABEL.value, -# ], -# ) - -# rj_cor_comando_pops_flow.schedule = every_month +with Flow( + "COR: Comando - POPs e Atividades dos POPs", + code_owners=[ + "paty", + ], +) as rj_cor_comando_pops_flow: + # Dump mode + dump_mode = Parameter("dump_mode", default="overwrite", required=False) + + # Materialization parameters + materialize_after_dump = Parameter( + "materialize_after_dump", default=False, required=False + ) + materialization_mode = Parameter( + "materialization_mode", default="prod", required=False + ) + materialize_to_datario = Parameter( + "materialize_to_datario", default=False, required=False + ) + + # Dump to GCS after? Should only dump to GCS if materializing to datario + dump_to_gcs = Parameter("dump_to_gcs", default=False, required=False) + maximum_bytes_processed = Parameter( + "maximum_bytes_processed", + required=False, + default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value, + ) + + dataset_id = Parameter( + "dataset_id", default=comando_constants.DATASET_ID.value, required=False + ) + table_id_pops = Parameter( + "table_id_pops", default=comando_constants.TABLE_ID_POPS.value, required=False + ) + + pops = download_data_pops() + path_pops = save_no_partition(dataframe=pops) + + task_upload_pops = create_table_and_upload_to_gcs( + data_path=path_pops, + dataset_id=dataset_id, + table_id=table_id_pops, + biglake_table=False, + dump_mode=dump_mode, + ) + + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + + materialization_pops_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": table_id_pops, + "mode": materialization_mode, + "materialize_to_datario": materialize_to_datario, + }, + labels=current_flow_labels, + run_name=f"Materialize {dataset_id}.{table_id_pops}", + ) + materialization_pops_flow.set_upstream(task_upload_pops) + + wait_for_pops_materialization = wait_for_flow_run( + materialization_pops_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_pops_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_pops_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(dump_to_gcs, True): + # Trigger Dump to GCS flow run with project id as datario + dump_pops_to_gcs_flow = create_flow_run( + flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "project_id": "datario", + "dataset_id": dataset_id, + "table_id": table_id_pops, + "maximum_bytes_processed": maximum_bytes_processed, + }, + labels=[ + "datario", + ], + run_name=f"Dump to GCS {dataset_id}.{table_id_pops}", + ) + dump_pops_to_gcs_flow.set_upstream(wait_for_pops_materialization) + + wait_for_dump_pops_to_gcs = wait_for_flow_run( + dump_pops_to_gcs_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + +rj_cor_comando_pops_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +rj_cor_comando_pops_flow.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[ + constants.RJ_COR_AGENT_LABEL.value, + ], +) + +rj_cor_comando_pops_flow.schedule = every_month diff --git a/pipelines/rj_cor/comando/eventos/tasks.py b/pipelines/rj_cor/comando/eventos/tasks.py index 6efd18fec..6ba6c177d 100644 --- a/pipelines/rj_cor/comando/eventos/tasks.py +++ b/pipelines/rj_cor/comando/eventos/tasks.py @@ -1,11 +1,5 @@ # -*- coding: utf-8 -*- -# pylint: disable=R0914,W0613,W0102,W0613,R0912,R0915,E1136,E1137,W0702 # flake8: noqa: E722 -# TODO: colocar id_pops novos -# TODO: gerar alerta quando tiver id_pop novo -# TODO: apagar histórico da nova api para ter o id_pop novo -# TODO: criar tabela dim do id_pop novo -# TODO: salvar no redis o máximo entre as colunas de data_inicio e data_fim, seguir flow só se tiver novidades em alguma dessas colunas """ Tasks for comando """ @@ -26,12 +20,10 @@ from prefect.engine.state import Skipped # from prefect.triggers import all_successful -# url_eventos = "http://aplicativo.cocr.com.br/comando/ocorrencias_api_nova" -from pipelines.rj_cor.utils import compare_actual_df_with_redis_df +# from pipelines.rj_cor.utils import compare_actual_df_with_redis_df from pipelines.rj_cor.comando.eventos.utils import ( # build_redis_key, - format_date, - treat_wrong_id_pop, + download_data, ) from pipelines.utils.utils import ( build_redis_key, @@ -44,22 +36,20 @@ ) -@task -def get_date_interval(first_date, last_date) -> Tuple[dict, str]: +@task(nout=2) +def get_date_interval(first_date: str = None, last_date: str = None): """ - If `first_date` and `last_date` are provided, format it to DD/MM/YYYY. Else, - get data from last 3 days. + If `first_date` and `last_date` are provided, convert it to pendulum + and add one day to `last_date`. Else, get data from last 3 days. first_date: str YYYY-MM-DD last_date: str YYYY-MM-DD """ if first_date and last_date: - first_date, last_date = format_date(first_date, last_date) + first_date = pendulum.from_format(first_date, "YYYY-MM-DD") + last_date = pendulum.from_format(last_date, "YYYY-MM-DD").add(days=1) else: - last_date = pendulum.today(tz="America/Sao_Paulo").date() - first_date = last_date.subtract(days=3) - first_date, last_date = format_date( - first_date.strftime("%Y-%m-%d"), last_date.strftime("%Y-%m-%d") - ) + last_date = pendulum.today(tz="America/Sao_Paulo").date().add(days=1) + first_date = last_date.subtract(days=4) return first_date, last_date @@ -146,18 +136,22 @@ def save_redis_max_date( # pylint: disable=too-many-arguments max_retries=3, retry_delay=timedelta(seconds=60), ) -def download_data_ocorrencias(first_date, last_date, wait=None) -> pd.DataFrame: +def download_data_ocorrencias( + first_date: pendulum, + last_date: pendulum, + wait=None, # pylint: disable=unused-argument +) -> pd.DataFrame: """ - Download data from API + Download data from API adding one day at a time so we can save + the date in a new column `data_particao` that will be used to + create the partitions when saving data. """ # auth_token = get_token() url_secret = get_vault_secret("comando")["data"] - url_eventos = url_secret["endpoint_eventos"] - - log(f"\n\nDownloading data from {first_date} to {last_date} (not included)") - dfr = pd.read_json(f"{url_eventos}/?data_i={first_date}&data_f={last_date}") + url = url_secret["endpoint_eventos"] + dfr = download_data(first_date, last_date, url) return dfr @@ -165,7 +159,7 @@ def download_data_ocorrencias(first_date, last_date, wait=None) -> pd.DataFrame: def treat_data_ocorrencias( dfr: pd.DataFrame, redis_max_date: str, -) -> Tuple[pd.DataFrame, pd.DataFrame]: +) -> Tuple[pd.DataFrame, str]: """ Rename cols and normalize data. """ @@ -177,7 +171,6 @@ def treat_data_ocorrencias( "pop_id": "id_pop", "inicio": "data_inicio", "fim": "data_fim", - "pop": "pop_titulo", "titulo": "pop_especificacao", } ) @@ -206,11 +199,14 @@ def treat_data_ocorrencias( "Secundária": "Secundario", } ) - dfr["descricao"] = dfr["descricao"].apply(unidecode) + categorical_cols = ["pop", "descricao", "bairro", "gravidade", "status"] + for i in categorical_cols: + dfr[i] = dfr[i].str.capitalize() + dfr[i] = dfr[i].apply(unidecode) mandatory_cols = [ - "id_pop", "id_evento", + "pop", "bairro", "data_inicio", "data_fim", @@ -221,6 +217,7 @@ def treat_data_ocorrencias( "longitude", "status", "tipo", + "create_partition", ] # Create cols if they don exist on new API for col in mandatory_cols: @@ -233,27 +230,19 @@ def treat_data_ocorrencias( "gravidade", "status", "tipo", - "pop_titulo", + "pop", ] for i in categorical_cols: dfr[i] = dfr[i].str.capitalize() - # This treatment is temporary. Now the id_pop from API is comming with the same value as id_evento - dfr = treat_wrong_id_pop(dfr) - log(f"This id_pop are missing {dfr[dfr.id_pop.isna()]} they were replaced by 99") - dfr["id_pop"] = dfr["id_pop"].fillna(99) - - # Treat id_pop col - dfr["id_pop"] = dfr["id_pop"].astype(float).astype(int) - for col in ["data_inicio", "data_fim"]: dfr[col] = dfr[col].dt.strftime("%Y-%m-%d %H:%M:%S") # Set the order to match the original table dfr = dfr[mandatory_cols] - # Create a column with time of row creation to keep last event on dbt - dfr["created_at"] = pendulum.now(tz="America/Sao_Paulo").strftime( + # Create a column with time of row update to keep last event on dbt + dfr["last_updated_at"] = pendulum.now(tz="America/Sao_Paulo").strftime( "%Y-%m-%d %H:%M:%S" ) @@ -265,49 +254,71 @@ def treat_data_ocorrencias( max_retries=3, retry_delay=timedelta(seconds=60), ) -def download_data_atividades(first_date, last_date, wait=None) -> pd.DataFrame: +def download_data_atividades( + first_date, + last_date, + wait=None, # pylint: disable=unused-argument +) -> pd.DataFrame: """ Download data from API """ url_secret = get_vault_secret("comando")["data"] - url_atividades_evento = url_secret["endpoint_atividades_evento"] - - dfr = pd.read_json( - f"{url_atividades_evento}/?data_i={first_date}&data_f={last_date}" - ) + url = url_secret["endpoint_atividades_evento"] + dfr = download_data(first_date, last_date, url) return dfr +# @task(nout=2) +# def treat_data_atividades( +# dfr: pd.DataFrame, +# dfr_redis: pd.DataFrame, +# columns: list, +# ) -> Tuple[pd.DataFrame, pd.DataFrame]: @task(nout=2) def treat_data_atividades( dfr: pd.DataFrame, - dfr_redis: pd.DataFrame, - columns: list, -) -> Tuple[pd.DataFrame, pd.DataFrame]: + redis_max_date: str, +) -> Tuple[pd.DataFrame, str]: """ Normalize data to be similiar to old API. """ print("Start treating data") + dfr["id_evento"] = dfr["id_evento"].astype(float).astype(int).astype(str) dfr.orgao = dfr.orgao.replace(["\r", "\n"], ["", ""], regex=True) print(f"Dataframe before comparing with last data saved on redis {dfr.head()}") + for col in ["data_inicio", "data_fim", "data_chegada"]: + dfr[col] = pd.to_datetime(dfr[col], errors="coerce") - dfr, dfr_redis = compare_actual_df_with_redis_df( - dfr, - dfr_redis, - columns, - ) - print(f"Dataframe after comparing with last data saved on redis {dfr.head()}") + max_date = dfr[["data_inicio", "data_fim", "data_chegada"]].max().max() + max_date = max_date.strftime("%Y-%m-%d %H:%M:%S") + + log(f"Last API data was {max_date} and last redis uptade was {redis_max_date}") - # If df is empty stop flow - if dfr.shape[0] == 0: + if max_date <= redis_max_date: skip_text = "No new data available on API" print(skip_text) raise ENDRUN(state=Skipped(skip_text)) + # Get new max_date to save on redis + redis_max_date = max_date + + # dfr, dfr_redis = compare_actual_df_with_redis_df( + # dfr, + # dfr_redis, + # columns, + # ) + # print(f"Dataframe after comparing with last data saved on redis {dfr.head()}") + + # # If df is empty stop flow + # if dfr.shape[0] == 0: + # skip_text = "No new data available on API" + # print(skip_text) + # raise ENDRUN(state=Skipped(skip_text)) + mandatory_cols = [ "id_evento", "sigla", @@ -317,6 +328,7 @@ def treat_data_atividades( "data_fim", "descricao", "status", + "create_partition", ] # Create cols if they don exist on new API @@ -334,14 +346,7 @@ def treat_data_atividades( print("\n\nDEBUG", dfr[categorical_cols]) for i in categorical_cols: dfr[i] = dfr[i].str.capitalize() - # dfr[i] = dfr[i].apply(unidecode) - - for col in ["data_inicio", "data_fim", "data_chegada"]: - dfr[col] = pd.to_datetime(dfr[col], errors="coerce") - - # TODO: Essa conversão é temporária - for col in ["data_inicio", "data_fim", "data_chegada"]: - dfr[col] = dfr[col].dt.tz_convert("America/Sao_Paulo") + dfr[i] = dfr[i].apply(unidecode) for col in ["data_inicio", "data_fim", "data_chegada"]: dfr[col] = dfr[col].dt.strftime("%Y-%m-%d %H:%M:%S") @@ -349,28 +354,33 @@ def treat_data_atividades( # Set the order to match the original table dfr = dfr[mandatory_cols] - # Create a column with time of row creation to keep last event on dbt - dfr["created_at"] = pendulum.now(tz="America/Sao_Paulo").strftime( + # Create a column with time of row update to keep last event on dbt + dfr["last_updated_at"] = pendulum.now(tz="America/Sao_Paulo").strftime( "%Y-%m-%d %H:%M:%S" ) - return dfr.drop_duplicates(), dfr_redis + return dfr.drop_duplicates(), redis_max_date @task def save_data(dataframe: pd.DataFrame) -> Union[str, Path]: """ Save data on a csv file to be uploaded to GCP + PS: It's not mandatory to start an activity of an event. As a result we have some activities + without any start date, but with an end date. The main problem is that we can not create the + partition column from data_inicio, that's why we created the column create_partition when + requesting the API. """ + log(f"Data that will be saved {dataframe.iloc[0]}") prepath = Path("/tmp/data/") prepath.mkdir(parents=True, exist_ok=True) - partition_column = "data_inicio" + partition_column = "create_partition" dataframe, partitions = parse_date_columns(dataframe, partition_column) to_partitions( - data=dataframe, + data=dataframe.drop(columns="create_partition"), partition_columns=partitions, savepath=prepath, data_type="csv", @@ -379,6 +389,25 @@ def save_data(dataframe: pd.DataFrame) -> Union[str, Path]: return prepath +@task( + nout=1, + max_retries=3, + retry_delay=timedelta(seconds=60), +) +def download_data_pops() -> pd.DataFrame: + """ + Download data from POP's API + """ + + url_secret = get_vault_secret("comando")["data"] + url = url_secret["endpoint_pops"] + + log("\n\nDownloading POP's data") + dfr = pd.read_json(f"{url}") + + return dfr + + @task def save_no_partition(dataframe: pd.DataFrame, append: bool = False) -> str: """ diff --git a/pipelines/rj_cor/comando/eventos/utils.py b/pipelines/rj_cor/comando/eventos/utils.py index 7bf879893..7e8d4b0db 100644 --- a/pipelines/rj_cor/comando/eventos/utils.py +++ b/pipelines/rj_cor/comando/eventos/utils.py @@ -3,17 +3,15 @@ General purpose functions for the comando project """ # pylint: disable=W0611 -import json +from urllib.error import HTTPError import requests from requests.adapters import HTTPAdapter, Retry -import pendulum import pandas as pd +import pendulum from pipelines.utils.utils import ( - get_redis_client, get_vault_secret, log, - treat_redis_output, ) @@ -29,58 +27,6 @@ def format_date(first_date, last_date): return first_date, last_date -def get_redis_output(redis_key, is_df: bool = False): - """ - Get Redis output. Use get to obtain a df from redis or hgetall if is a key value pair. - """ - redis_client = get_redis_client() # (host="127.0.0.1") - - if is_df: - json_data = redis_client.get(redis_key) - log(f"[DEGUB] json_data {json_data}") - if json_data: - # If data is found, parse the JSON string back to a Python object (dictionary) - data_dict = json.loads(json_data) - # Convert the dictionary back to a DataFrame - return pd.DataFrame(data_dict) - - return pd.DataFrame() - - output = redis_client.hgetall(redis_key) - if len(output) > 0: - output = treat_redis_output(output) - return output - - -def compare_actual_df_with_redis_df( - dfr: pd.DataFrame, - dfr_redis: pd.DataFrame, - columns: list, -) -> pd.DataFrame: - """ - Compare df from redis to actual df and return only the rows from actual df - that are not already saved on redis. - """ - for col in columns: - if col not in dfr_redis.columns: - dfr_redis[col] = None - dfr_redis[col] = dfr_redis[col].astype(dfr[col].dtypes) - log(f"\nEnded conversion types from dfr to dfr_redis: \n{dfr_redis.dtypes}") - - dfr_diff = ( - pd.merge(dfr, dfr_redis, how="left", on=columns, indicator=True) - .query('_merge == "left_only"') - .drop("_merge", axis=1) - ) - log( - f"\nDf resulted from the difference between dft_redis and dfr: \n{dfr_diff.head()}" - ) - - updated_dfr_redis = pd.concat([dfr_redis, dfr_diff[columns]]) - - return dfr_diff, updated_dfr_redis - - def treat_wrong_id_pop(dfr): """ Create id_pop based on pop_titulo column @@ -175,3 +121,26 @@ def get_url(url, parameters: dict = None, token: str = None): # pylint: disable log(f"This resulted in the following error: {exc}") response = {"response": None} return response + + +def download_data(first_date, last_date, url) -> pd.DataFrame: + """ + Download data from API adding one day at a time so we can save + the date in a new column `data_particao` that will be used to + create the partitions when saving data. + """ + dfr = pd.DataFrame() + temp_date = first_date.add(days=1) + fmt = "%d/%m/%Y" + while temp_date <= last_date: + log(f"\n\nDownloading data from {first_date} to {temp_date} (not included)") + try: + dfr_temp = pd.read_json( + f"{url}/?data_i={first_date.strftime(fmt)}&data_f={temp_date.strftime(fmt)}" + ) + dfr_temp["create_partition"] = first_date.strftime("%Y-%m-%d") + dfr = pd.concat([dfr, dfr_temp]) + except HTTPError as error: + print(f"Error downloading this data: {error}") + first_date, temp_date = first_date.add(days=1), temp_date.add(days=1) + return dfr