From 17a23ea271e4612ff18045754451a1154ba72650 Mon Sep 17 00:00:00 2001 From: eng-rodrigocunha <66736583+eng-rodrigocunha@users.noreply.github.com> Date: Mon, 19 Feb 2024 18:08:08 +0000 Subject: [PATCH] =?UTF-8?q?Deploying=20to=20gh-pages=20from=20@=20prefeitu?= =?UTF-8?q?ra-rio/pipelines@46db4af5edec2bb18cef543590a006d7e2a89225=20?= =?UTF-8?q?=F0=9F=9A=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rj_smtr/constants.html | 1131 ++++++++++++++++++++++ rj_smtr/projeto_subsidio_sppo/flows.html | 185 ++-- rj_smtr/projeto_subsidio_sppo/tasks.html | 299 +++++- rj_smtr/tasks.html | 1 - rj_smtr/utils.html | 297 +++++- 5 files changed, 1837 insertions(+), 76 deletions(-) diff --git a/rj_smtr/constants.html b/rj_smtr/constants.html index d1481c213..8dd518587 100644 --- a/rj_smtr/constants.html +++ b/rj_smtr/constants.html @@ -188,11 +188,564 @@
pipelines.rj_smtr.constants
var SUBSIDIO_SPPO_CODE_OWNERS
var SUBSIDIO_SPPO_DASHBOARD_DATASET_ID
var SUBSIDIO_SPPO_DATA_CHECKS_PARAMS
var SUBSIDIO_SPPO_DATA_CHECKS_POS_LIST
var SUBSIDIO_SPPO_DATA_CHECKS_PRE_LIST
var SUBSIDIO_SPPO_RECURSOS_DATASET_ID
var SUBSIDIO_SPPO_SECRET_PATH
var SUBSIDIO_SPPO_TABLE_ID
STU_MODE_MAPPING
STU_TABLE_CAPTURE_PARAMS
STU_TYPE_MAPPING
SUBSIDIO_SPPO_CODE_OWNERS
SUBSIDIO_SPPO_DASHBOARD_DATASET_ID
SUBSIDIO_SPPO_DASHBOARD_TABLE_ID
SUBSIDIO_SPPO_DATASET_ID
SUBSIDIO_SPPO_DATA_CHECKS_PARAMS
SUBSIDIO_SPPO_DATA_CHECKS_POS_LIST
SUBSIDIO_SPPO_DATA_CHECKS_PRE_LIST
SUBSIDIO_SPPO_RECURSOS_DATASET_ID
SUBSIDIO_SPPO_RECURSOS_MATERIALIZACAO_PARAMS
SUBSIDIO_SPPO_RECURSOS_TABLE_IDS
SUBSIDIO_SPPO_RECURSO_API_SECRET_PATH
SUBSIDIO_SPPO_RECURSO_CAPTURE_PARAMS
SUBSIDIO_SPPO_RECURSO_TABLE_CAPTURE_PARAMS
SUBSIDIO_SPPO_SECRET_PATH
SUBSIDIO_SPPO_TABLE_ID
TASK_MAX_RETRIES
TASK_RETRY_DELAY
pipelines.rj_smtr.projeto_subsidio_sppo.flowsModule pipelines.rj_smtr.projeto_subsidio_sppo.flowsModule pipelines.rj_smtr.projeto_subsidio_sppo.flowsModule pipelines.rj_smtr.projeto_subsidio_sppo.flowsModule pipelines.rj_smtr.projeto_subsidio_sppo.flowsModule pipelines.rj_smtr.projeto_subsidio_sppo.flows
+Functions
+
+
+def SPPO_VEICULO_DIA_RUN_WAIT_FALSE()
+
+-
+
+
+
+Expand source code
+
+lambda: [None], checkpoint=False, name="assign_none_to_previous_runs"
+
+
+
@@ -328,6 +372,11 @@ Index
pipelines.rj_smtr.projeto_subsidio_sppo
+Functions
+
+
diff --git a/rj_smtr/projeto_subsidio_sppo/tasks.html b/rj_smtr/projeto_subsidio_sppo/tasks.html
index 5969cc281..43ae3a3cd 100644
--- a/rj_smtr/projeto_subsidio_sppo/tasks.html
+++ b/rj_smtr/projeto_subsidio_sppo/tasks.html
@@ -32,15 +32,158 @@ Module pipelines.rj_smtr.projeto_subsidio_sppo.tasks
+ return param is None
+
+
+@task
+def subsidio_data_quality_check(
+ mode: str, params: dict, code_owners: list = None, check_params: dict = None
+) -> bool:
+ """
+ Verifica qualidade de dados para o processo de apuração de subsídio
+
+ Args:
+ mode (str): Modo de execução (pre ou pos)
+ params (dict): Parameters for the checks
+ code_owners (list): Code owners to be notified
+ check_params (dict): queries and order columns for the checks
+
+ Returns:
+ test_check (bool): True if all checks passed, False otherwise
+ """
+
+ if mode not in ["pre", "pos"]:
+ raise ValueError("Mode must be 'pre' or 'pos'")
+
+ if check_params is None:
+ check_params = smtr_constants.SUBSIDIO_SPPO_DATA_CHECKS_PARAMS.value
+
+ if code_owners is None:
+ code_owners = smtr_constants.SUBSIDIO_SPPO_CODE_OWNERS.value
+
+ checks = dict()
+
+ request_params = {
+ "start_timestamp": f"""{params["start_date"]} 00:00:00""",
+ "end_timestamp": (
+ datetime.strptime(params["end_date"], "%Y-%m-%d") + timedelta(hours=27)
+ ).strftime("%Y-%m-%d %H:%M:%S"),
+ }
+
+ if mode == "pos":
+ request_params["end_timestamp"] = f"""{params["end_date"]} 00:00:00"""
+ request_params[
+ "dataset_id"
+ ] = smtr_constants.SUBSIDIO_SPPO_DASHBOARD_DATASET_ID.value
+
+ checks_list = (
+ smtr_constants.SUBSIDIO_SPPO_DATA_CHECKS_PRE_LIST.value
+ if mode == "pre"
+ else smtr_constants.SUBSIDIO_SPPO_DATA_CHECKS_POS_LIST.value
+ )
+
+ for (
+ table_id,
+ test_check_list,
+ ) in checks_list.items():
+ checks[table_id] = perform_checks_for_table(
+ table_id, request_params, test_check_list, check_params
+ )
+
+ log(checks)
+
+ date_range = (
+ params["start_date"]
+ if params["start_date"] == params["end_date"]
+ else f'{params["start_date"]} a {params["end_date"]}'
+ )
+
+ webhook_url = get_vault_secret(
+ secret_path=smtr_constants.SUBSIDIO_SPPO_SECRET_PATH.value
+ )["data"]["discord_data_check_webhook"]
+
+ test_check = all(
+ table["status"] for sublist in checks.values() for table in sublist
+ )
+
+ formatted_messages = [
+ ":green_circle: " if test_check else ":red_circle: ",
+ f"**{mode.capitalize()}-Data Quality Checks - Apuração de Subsídio - {date_range}**\n\n",
+ ]
+
+ if "general" in checks:
+ formatted_messages.extend(
+ f'{":white_check_mark:" if check["status"] else ":x:"} {check["desc"]}\n'
+ for check in checks["general"]
+ )
+
+ format_send_discord_message(formatted_messages, webhook_url)
+
+ for table_id, checks_ in checks.items():
+ if table_id != "general":
+ formatted_messages = [
+ f"*{table_id}:*\n"
+ + "\n".join(
+ f'{":white_check_mark:" if check["status"] else ":x:"} {check["desc"]}'
+ for check in checks_
+ )
+ ]
+ format_send_discord_message(formatted_messages, webhook_url)
+
+ formatted_messages = ["\n\n"]
+
+ if mode == "pre":
+ formatted_messages.append(
+ ""
+ if test_check
+ else """:warning: **Status:** Necessidade de revisão dos dados de entrada!\n"""
+ )
+
+ if mode == "pos":
+ formatted_messages.append(
+ ":tada: **Status:** Sucesso"
+ if test_check
+ else ":warning: **Status:** Testes falharam. Necessidade de revisão dos dados finais!\n"
+ )
+
+ if not test_check:
+ at_code_owners = [
+ f' - <@{constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["user_id"]}>\n'
+ if constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["type"] == "user"
+ else f' - <@!{constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["user_id"]}>\n'
+ if constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["type"]
+ == "user_nickname"
+ else f' - <#{constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["user_id"]}>\n'
+ if constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["type"] == "channel"
+ else f' - <@&{constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["user_id"]}>\n'
+ for code_owner in code_owners
+ ]
+
+ formatted_messages.extend(at_code_owners)
+
+ format_send_discord_message(formatted_messages, webhook_url)
+
+ return test_check
@@ -67,6 +210,159 @@ Functions
return param is None
+def subsidio_data_quality_check(mode: str, params: dict, code_owners: list = None, check_params: dict = None) ‑> bool
+
Verifica qualidade de dados para o processo de apuração de subsídio
+mode
: str
params
: dict
code_owners
: list
check_params
: dict
test_check (bool): True if all checks passed, False otherwise
@task
+def subsidio_data_quality_check(
+ mode: str, params: dict, code_owners: list = None, check_params: dict = None
+) -> bool:
+ """
+ Verifica qualidade de dados para o processo de apuração de subsídio
+
+ Args:
+ mode (str): Modo de execução (pre ou pos)
+ params (dict): Parameters for the checks
+ code_owners (list): Code owners to be notified
+ check_params (dict): queries and order columns for the checks
+
+ Returns:
+ test_check (bool): True if all checks passed, False otherwise
+ """
+
+ if mode not in ["pre", "pos"]:
+ raise ValueError("Mode must be 'pre' or 'pos'")
+
+ if check_params is None:
+ check_params = smtr_constants.SUBSIDIO_SPPO_DATA_CHECKS_PARAMS.value
+
+ if code_owners is None:
+ code_owners = smtr_constants.SUBSIDIO_SPPO_CODE_OWNERS.value
+
+ checks = dict()
+
+ request_params = {
+ "start_timestamp": f"""{params["start_date"]} 00:00:00""",
+ "end_timestamp": (
+ datetime.strptime(params["end_date"], "%Y-%m-%d") + timedelta(hours=27)
+ ).strftime("%Y-%m-%d %H:%M:%S"),
+ }
+
+ if mode == "pos":
+ request_params["end_timestamp"] = f"""{params["end_date"]} 00:00:00"""
+ request_params[
+ "dataset_id"
+ ] = smtr_constants.SUBSIDIO_SPPO_DASHBOARD_DATASET_ID.value
+
+ checks_list = (
+ smtr_constants.SUBSIDIO_SPPO_DATA_CHECKS_PRE_LIST.value
+ if mode == "pre"
+ else smtr_constants.SUBSIDIO_SPPO_DATA_CHECKS_POS_LIST.value
+ )
+
+ for (
+ table_id,
+ test_check_list,
+ ) in checks_list.items():
+ checks[table_id] = perform_checks_for_table(
+ table_id, request_params, test_check_list, check_params
+ )
+
+ log(checks)
+
+ date_range = (
+ params["start_date"]
+ if params["start_date"] == params["end_date"]
+ else f'{params["start_date"]} a {params["end_date"]}'
+ )
+
+ webhook_url = get_vault_secret(
+ secret_path=smtr_constants.SUBSIDIO_SPPO_SECRET_PATH.value
+ )["data"]["discord_data_check_webhook"]
+
+ test_check = all(
+ table["status"] for sublist in checks.values() for table in sublist
+ )
+
+ formatted_messages = [
+ ":green_circle: " if test_check else ":red_circle: ",
+ f"**{mode.capitalize()}-Data Quality Checks - Apuração de Subsídio - {date_range}**\n\n",
+ ]
+
+ if "general" in checks:
+ formatted_messages.extend(
+ f'{":white_check_mark:" if check["status"] else ":x:"} {check["desc"]}\n'
+ for check in checks["general"]
+ )
+
+ format_send_discord_message(formatted_messages, webhook_url)
+
+ for table_id, checks_ in checks.items():
+ if table_id != "general":
+ formatted_messages = [
+ f"*{table_id}:*\n"
+ + "\n".join(
+ f'{":white_check_mark:" if check["status"] else ":x:"} {check["desc"]}'
+ for check in checks_
+ )
+ ]
+ format_send_discord_message(formatted_messages, webhook_url)
+
+ formatted_messages = ["\n\n"]
+
+ if mode == "pre":
+ formatted_messages.append(
+ ""
+ if test_check
+ else """:warning: **Status:** Necessidade de revisão dos dados de entrada!\n"""
+ )
+
+ if mode == "pos":
+ formatted_messages.append(
+ ":tada: **Status:** Sucesso"
+ if test_check
+ else ":warning: **Status:** Testes falharam. Necessidade de revisão dos dados finais!\n"
+ )
+
+ if not test_check:
+ at_code_owners = [
+ f' - <@{constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["user_id"]}>\n'
+ if constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["type"] == "user"
+ else f' - <@!{constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["user_id"]}>\n'
+ if constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["type"]
+ == "user_nickname"
+ else f' - <#{constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["user_id"]}>\n'
+ if constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["type"] == "channel"
+ else f' - <@&{constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["user_id"]}>\n'
+ for code_owner in code_owners
+ ]
+
+ formatted_messages.extend(at_code_owners)
+
+ format_send_discord_message(formatted_messages, webhook_url)
+
+ return test_check
+pipelines.rj_smtr.tasks
pipelines.rj_smtr.utils
pipelines.rj_smtr.utils
pipelines.rj_smtr.utils
+def format_send_discord_message(formatted_messages: list, webhook_url: str)
+
Format and send a message to discord
+formatted_messages
: list
webhook_url
: str
None
def format_send_discord_message(formatted_messages: list, webhook_url: str):
+ """
+ Format and send a message to discord
+
+ Args:
+ formatted_messages (list): The formatted messages
+ webhook_url (str): The webhook url
+
+ Returns:
+ None
+ """
+ formatted_message = "".join(formatted_messages)
+ log(formatted_message)
+ msg_ext = len(formatted_message)
+ if msg_ext > 2000:
+ log(
+ f"** Message too long ({msg_ext} characters), will be split into multiple messages **"
+ )
+ # Split message into lines
+ lines = formatted_message.split("\n")
+ message_chunks = []
+ chunk = ""
+ for line in lines:
+ if len(chunk) + len(line) + 1 > 2000: # +1 for the newline character
+ message_chunks.append(chunk)
+ chunk = ""
+ chunk += line + "\n"
+ message_chunks.append(chunk) # Append the last chunk
+ for chunk in message_chunks:
+ send_discord_message(
+ message=chunk,
+ webhook_url=webhook_url,
+ )
+ else:
+ send_discord_message(
+ message=formatted_message,
+ webhook_url=webhook_url,
+ )
+
def generate_df_and_save(data: dict, fname: pathlib.Path)
+def perform_check(desc: str, check_params: dict, request_params: dict) ‑> dict
+
Perform a check on a query
+desc
: str
check_params
: dict
request_params
: dict
dict
def perform_check(desc: str, check_params: dict, request_params: dict) -> dict:
+ """
+ Perform a check on a query
+
+ Args:
+ desc (str): The check description
+ check_params (dict): The check parameters
+ * query (str): SQL query to be executed
+ * order_columns (list): order columns for query log results, in case of failure (optional)
+ request_params (dict): The request parameters
+
+ Returns:
+ dict: The check status
+ """
+ try:
+ q = check_params["query"].format(**request_params)
+ order_columns = check_params.get("order_columns", None)
+ except KeyError as e:
+ raise ValueError(f"Missing key in check_params: {e}") from e
+
+ log(q)
+ df = bd.read_sql(q)
+
+ check_status = df.empty
+
+ check_status_dict = {"desc": desc, "status": check_status}
+
+ log(f"Check status:\n{check_status_dict}")
+
+ if not check_status:
+ log(f"Data info:\n{data_info_str(df)}")
+ log(
+ f"Sorted data:\n{df.sort_values(by=order_columns) if order_columns else df}"
+ )
+
+ return check_status_dict
+
+def perform_checks_for_table(table_id: str, request_params: dict, test_check_list: dict, check_params: dict) ‑> dict
+
Perform checks for a table
+table_id
: str
request_params
: dict
test_check_list
: dict
check_params
: dict
dict
def perform_checks_for_table(
+ table_id: str, request_params: dict, test_check_list: dict, check_params: dict
+) -> dict:
+ """
+ Perform checks for a table
+
+ Args:
+ table_id (str): The table id
+ request_params (dict): The request parameters
+ test_check_list (dict): The test check list
+ check_params (dict): The check parameters
+
+ Returns:
+ dict: The checks
+ """
+ request_params["table_id"] = table_id
+ checks = list()
+
+ for description, test_check in test_check_list.items():
+ request_params["expression"] = test_check.get("expression", "")
+ checks.append(
+ perform_check(
+ description,
+ check_params.get(test_check.get("test", "expression_is_true")),
+ request_params | test_check.get("params", {}),
+ )
+ )
+
+ return checks
+
def read_raw_data(filepath: str, reader_args: dict = None) ‑> tuple[str, pandas.core.frame.DataFrame]
execute_db_query
filter_data
filter_null
format_send_discord_message
generate_df_and_save
generate_execute_schedules
get_datetime_range
get_upload_storage_blob
log_critical
map_dict_keys
perform_check
perform_checks_for_table
read_raw_data
safe_cast
save_raw_local_func