From 8cfea43b0d9cbc7c3fdd70d6c4ce0b64162ad9cd Mon Sep 17 00:00:00 2001 From: patriciacatandi <62657143+patriciacatandi@users.noreply.github.com> Date: Wed, 28 Feb 2024 13:11:23 +0000 Subject: [PATCH] =?UTF-8?q?Deploying=20to=20gh-pages=20from=20@=20prefeitu?= =?UTF-8?q?ra-rio/pipelines@770843827b370bb2d9de3700da7832cc59b1b3ed=20?= =?UTF-8?q?=F0=9F=9A=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rj_cor/comando/eventos/constants.html | 9 +- rj_cor/comando/eventos/flows.html | 287 +++++++++++++------- rj_cor/comando/eventos/tasks.html | 370 ++++++++++++++++---------- rj_cor/comando/eventos/utils.html | 133 ++++++++- 4 files changed, 568 insertions(+), 231 deletions(-) diff --git a/rj_cor/comando/eventos/constants.html b/rj_cor/comando/eventos/constants.html index 0a542f33c..3ee72eb45 100644 --- a/rj_cor/comando/eventos/constants.html +++ b/rj_cor/comando/eventos/constants.html @@ -45,7 +45,7 @@
pipelines.rj_cor.comando.eventos.constants
DATASET_ID = "adm_cor_comando"
TABLE_ID_EVENTOS = "ocorrencias_nova_api"
REDIS_NAME = "cor_api_last_days"
- # TABLE_ID_ATIVIDADES_EVENTOS = "ocorrencias_orgaos_responsaveis"
+ TABLE_ID_ATIVIDADES_EVENTOS = "ocorrencias_orgaos_responsaveis_nova_api"
# TABLE_ID_POPS = "procedimento_operacional_padrao"
# TABLE_ID_ATIVIDADES_POPS = "procedimento_operacional_padrao_orgaos_responsaveis"
RAIN_DASHBOARD_FLOW_SCHEDULE_PARAMETERS = {
@@ -238,7 +238,7 @@ var TABLE_ID_ATIVIDADES_EVENTOS
var TABLE_ID_EVENTOS
RAIN_DASHBOARD_FLOW_SCHEDULE_PARAMETERS
RAIN_DASHBOARD_LAST_2H_FLOW_SCHEDULE_PARAMETERS
REDIS_NAME
TABLE_ID_ATIVIDADES_EVENTOS
TABLE_ID_EVENTOS
pipelines.rj_cor.comando.eventos.flows
pipelines.rj_cor.comando.eventos.flows
pipelines.rj_cor.comando.eventos.flows
Module pipelines.rj_cor.comando.eventos.flows
Module pipelines.rj_cor.comando.eventos.flows
Module pipelines.rj_cor.comando.eventos.flows
Module pipelines.rj_cor.comando.eventos.flows
Module pipelines.rj_cor.comando.eventos.flows
Module pipelines.rj_cor.comando.eventos.tasks
Module pipelines.rj_cor.comando.eventos.tasks
Module pipelines.rj_cor.comando.eventos.tasks
Module pipelines.rj_cor.comando.eventos.tasks
Module pipelines.rj_cor.comando.eventos.tasks
Module pipelines.rj_cor.comando.eventos.tasks
Module pipelines.rj_cor.comando.eventos.tasks
Module pipelines.rj_cor.comando.eventos.tasks
-def compare_actual_df_with_redis_df(dfr: pandas.core.frame.DataFrame, dfr_redis: pandas.core.frame.DataFrame, columns: list) ‑> pandas.core.frame.DataFrame
+
+def download_data_atividades(first_date, last_date, wait=None) ‑> pandas.core.frame.DataFrame
-
-
Compare df from redis to actual df and return only the rows from actual df
-that are not already saved on redis.
+Download data from API
Expand source code
-def compare_actual_df_with_redis_df(
- dfr: pd.DataFrame,
- dfr_redis: pd.DataFrame,
- columns: list,
-) -> pd.DataFrame:
+@task(
+ nout=1,
+ max_retries=3,
+ retry_delay=timedelta(seconds=60),
+)
+def download_data_atividades(first_date, last_date, wait=None) -> pd.DataFrame:
"""
- Compare df from redis to actual df and return only the rows from actual df
- that are not already saved on redis.
+ Download data from API
"""
- for col in columns:
- if col not in dfr_redis.columns:
- dfr_redis[col] = None
- dfr_redis[col] = dfr_redis[col].astype(dfr[col].dtypes)
- log(f"\nEnded conversion types from dfr to dfr_redis: \n{dfr_redis.dtypes}")
- dfr_diff = (
- pd.merge(dfr, dfr_redis, how="left", on=columns, indicator=True)
- .query('_merge == "left_only"')
- .drop("_merge", axis=1)
- )
- log(
- f"\nDf resulted from the difference between dft_redis and dfr: \n{dfr_diff.head()}"
- )
+ url_secret = get_vault_secret("comando")["data"]
+ url_atividades_evento = url_secret["endpoint_atividades_evento"]
- updated_dfr_redis = pd.concat([dfr_redis, dfr_diff[columns]])
+ dfr = pd.read_json(
+ f"{url_atividades_evento}/?data_i={first_date}&data_f={last_date}"
+ )
- return dfr_diff, updated_dfr_redis
+ return dfr
-
-def download_data(first_date, last_date, wait=None) ‑> pandas.core.frame.DataFrame
+
+def download_data_ocorrencias(first_date, last_date, wait=None) ‑> pandas.core.frame.DataFrame
-
Download data from API
@@ -397,11 +435,11 @@ Functions
Expand source code
@task(
- nout=3,
+ nout=1,
max_retries=3,
retry_delay=timedelta(seconds=60),
)
-def download_data(first_date, last_date, wait=None) -> pd.DataFrame:
+def download_data_ocorrencias(first_date, last_date, wait=None) -> pd.DataFrame:
"""
Download data from API
"""
@@ -409,7 +447,6 @@ Functions
url_secret = get_vault_secret("comando")["data"]
url_eventos = url_secret["endpoint_eventos"]
- ## url_atividades_evento = url_secret["endpoint_atividades_evento"]
dfr = pd.read_json(f"{url_eventos}/?data_i={first_date}&data_f={last_date}")
@@ -491,41 +528,6 @@ Functions
return dfr_redis
-
-def get_redis_output(redis_key, is_df: bool = False)
-
--
-
Get Redis output
-Example: {b'date': b'2023-02-27 07:29:04'}
-
-
-Expand source code
-
-def get_redis_output(redis_key, is_df: bool = False):
- """
- Get Redis output
- Example: {b'date': b'2023-02-27 07:29:04'}
- """
- redis_client = get_redis_client() # (host="127.0.0.1")
-
- if is_df:
- json_data = redis_client.get(redis_key)
- print(type(json_data))
- print(json_data)
- if json_data:
- # If data is found, parse the JSON string back to a Python object (dictionary)
- data_dict = json.loads(json_data)
- # Convert the dictionary back to a DataFrame
- return pd.DataFrame(data_dict)
-
- return pd.DataFrame()
-
- output = redis_client.hgetall(redis_key)
- if len(output) > 0:
- output = treat_redis_output(output)
- return output
-
-
def not_none(something: Any) ‑> bool
@@ -606,8 +608,94 @@ Functions
return path_to_directory
-
-def treat_data(dfr: pandas.core.frame.DataFrame, dfr_redis: pandas.core.frame.DataFrame, columns: list) ‑> Tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]
+
+def treat_data_atividades(dfr: pandas.core.frame.DataFrame, dfr_redis: pandas.core.frame.DataFrame, columns: list) ‑> Tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]
+
+-
+
Normalize data to be similiar to old API.
+
+
+Expand source code
+
+@task(nout=2)
+def treat_data_atividades(
+ dfr: pd.DataFrame,
+ dfr_redis: pd.DataFrame,
+ columns: list,
+) -> Tuple[pd.DataFrame, pd.DataFrame]:
+ """
+ Normalize data to be similiar to old API.
+ """
+
+ print("Start treating data")
+ dfr.orgao = dfr.orgao.replace(["\r", "\n"], ["", ""], regex=True)
+
+ print(f"Dataframe before comparing with last data saved on redis {dfr.head()}")
+
+ dfr, dfr_redis = compare_actual_df_with_redis_df(
+ dfr,
+ dfr_redis,
+ columns,
+ )
+ print(f"Dataframe after comparing with last data saved on redis {dfr.head()}")
+
+ # If df is empty stop flow
+ if dfr.shape[0] == 0:
+ skip_text = "No new data available on API"
+ print(skip_text)
+ raise ENDRUN(state=Skipped(skip_text))
+
+ mandatory_cols = [
+ "id_evento",
+ "sigla",
+ "orgao", # esse não tem na tabela antiga
+ "data_chegada",
+ "data_inicio",
+ "data_fim",
+ "descricao",
+ "status",
+ ]
+
+ # Create cols if they don exist on new API
+ for col in mandatory_cols:
+ if col not in dfr.columns:
+ dfr[col] = None
+
+ categorical_cols = [
+ "sigla",
+ "orgao",
+ "descricao",
+ "status",
+ ]
+
+ print("\n\nDEBUG", dfr[categorical_cols])
+ for i in categorical_cols:
+ dfr[i] = dfr[i].str.capitalize()
+ # dfr[i] = dfr[i].apply(unidecode)
+
+ for col in ["data_inicio", "data_fim", "data_chegada"]:
+ dfr[col] = pd.to_datetime(dfr[col], errors="coerce")
+
+ # TODO: Essa conversão é temporária
+ for col in ["data_inicio", "data_fim", "data_chegada"]:
+ dfr[col] = dfr[col].dt.tz_convert("America/Sao_Paulo")
+
+ for col in ["data_inicio", "data_fim", "data_chegada"]:
+ dfr[col] = dfr[col].dt.strftime("%Y-%m-%d %H:%M:%S")
+
+ # Set the order to match the original table
+ dfr = dfr[mandatory_cols]
+
+ # Create a column with time of row creation to keep last event on dbt
+ dfr["created_at"] = pendulum.now(tz="America/Sao_Paulo").strftime(
+ "%Y-%m-%d %H:%M:%S"
+ )
+
+ return dfr.drop_duplicates(), dfr_redis
+
+
+
+def treat_data_ocorrencias(dfr: pandas.core.frame.DataFrame, dfr_redis: pandas.core.frame.DataFrame, columns: list) ‑> Tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]
-
Rename cols and normalize data.
@@ -616,7 +704,7 @@ Functions
Expand source code
@task(nout=2)
-def treat_data(
+def treat_data_ocorrencias(
dfr: pd.DataFrame,
dfr_redis: pd.DataFrame,
columns: list,
@@ -651,7 +739,7 @@ Functions
if dfr.shape[0] == 0:
skip_text = "No new data available on API"
print(skip_text)
- # raise ENDRUN(state=Skipped(skip_text))
+ raise ENDRUN(state=Skipped(skip_text))
dfr["tipo"] = dfr["tipo"].replace(
{
@@ -699,6 +787,12 @@ Functions
# Treat id_pop col
dfr["id_pop"] = dfr["id_pop"].astype(float).astype(int)
+ for col in ["data_inicio", "data_fim"]:
+ dfr[col] = pd.to_datetime(dfr[col], errors="coerce")
+
+ for col in ["data_inicio", "data_fim"]:
+ dfr[col] = dfr[col].dt.strftime("%Y-%m-%d %H:%M:%S")
+
# Set the order to match the original table
dfr = dfr[mandatory_cols]
@@ -774,15 +868,15 @@ Index
Functions
diff --git a/rj_cor/comando/eventos/utils.html b/rj_cor/comando/eventos/utils.html
index fe92fef41..49ec71f86 100644
--- a/rj_cor/comando/eventos/utils.html
+++ b/rj_cor/comando/eventos/utils.html
@@ -37,7 +37,13 @@ Module pipelines.rj_cor.comando.eventos.utils
Module pipelines.rj_cor.comando.eventos.utils
Functions
return key
+
+def compare_actual_df_with_redis_df(dfr: pandas.core.frame.DataFrame, dfr_redis: pandas.core.frame.DataFrame, columns: list) ‑> pandas.core.frame.DataFrame
+
+-
+
Compare df from redis to actual df and return only the rows from actual df
+that are not already saved on redis.
+
+
+Expand source code
+
+def compare_actual_df_with_redis_df(
+ dfr: pd.DataFrame,
+ dfr_redis: pd.DataFrame,
+ columns: list,
+) -> pd.DataFrame:
+ """
+ Compare df from redis to actual df and return only the rows from actual df
+ that are not already saved on redis.
+ """
+ for col in columns:
+ if col not in dfr_redis.columns:
+ dfr_redis[col] = None
+ dfr_redis[col] = dfr_redis[col].astype(dfr[col].dtypes)
+ log(f"\nEnded conversion types from dfr to dfr_redis: \n{dfr_redis.dtypes}")
+
+ dfr_diff = (
+ pd.merge(dfr, dfr_redis, how="left", on=columns, indicator=True)
+ .query('_merge == "left_only"')
+ .drop("_merge", axis=1)
+ )
+ log(
+ f"\nDf resulted from the difference between dft_redis and dfr: \n{dfr_diff.head()}"
+ )
+
+ updated_dfr_redis = pd.concat([dfr_redis, dfr_diff[columns]])
+
+ return dfr_diff, updated_dfr_redis
+
+
def format_date(first_date, last_date)
@@ -196,6 +293,38 @@ Functions
return first_date, last_date
+
+def get_redis_output(redis_key, is_df: bool = False)
+
+-
+
Get Redis output. Use get to obtain a df from redis or hgetall if is a key value pair.
+
+
+Expand source code
+
+def get_redis_output(redis_key, is_df: bool = False):
+ """
+ Get Redis output. Use get to obtain a df from redis or hgetall if is a key value pair.
+ """
+ redis_client = get_redis_client() # (host="127.0.0.1")
+
+ if is_df:
+ json_data = redis_client.get(redis_key)
+ log(f"[DEGUB] json_data {json_data}")
+ if json_data:
+ # If data is found, parse the JSON string back to a Python object (dictionary)
+ data_dict = json.loads(json_data)
+ # Convert the dictionary back to a DataFrame
+ return pd.DataFrame(data_dict)
+
+ return pd.DataFrame()
+
+ output = redis_client.hgetall(redis_key)
+ if len(output) > 0:
+ output = treat_redis_output(output)
+ return output
+
+
def get_token()
@@ -373,7 +502,9 @@ Index
Functions