From 35c80d4532adc2c4ee12b300d4de201ee934bd7a Mon Sep 17 00:00:00 2001 From: Rafael Date: Tue, 17 Oct 2023 14:34:31 -0300 Subject: [PATCH] =?UTF-8?q?generaliza=20fun=C3=A7=C3=A3o=20query=20logs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pipelines/rj_smtr/tasks.py | 99 +++++------------------------ pipelines/rj_smtr/utils.py | 123 +++++++++++++++++++++++++++++++++++++ 2 files changed, 139 insertions(+), 83 deletions(-) diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index 3969f28b9..2b733aef0 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -26,7 +26,6 @@ bq_project, get_table_min_max_value, get_last_run_timestamp, - log_critical, data_info_str, dict_contains_keys, get_raw_data_api, @@ -37,6 +36,7 @@ read_raw_data, save_treated_local_func, save_raw_local_func, + query_logs_func, ) from pipelines.utils.execute_dbt_model.utils import get_dbt_client from pipelines.utils.utils import log, get_redis_client, get_vault_secret @@ -370,6 +370,7 @@ def query_logs( table_id: str, datetime_filter=None, max_recaptures: int = 60, + interval_minutes: int = 1, ): """ Queries capture logs to check for errors @@ -380,92 +381,22 @@ def query_logs( datetime_filter (pendulum.datetime.DateTime, optional): filter passed to query. This task will query the logs table for the last 1 day before datetime_filter + max_recaptures (int, optional): maximum number of recaptures to be done + interval_minutes (int, optional): interval in minutes between each recapture Returns: - list: containing timestamps for which the capture failed + lists: errors (bool), + timestamps (list of pendulum.datetime.DateTime), + previous_errors (list of previous errors) """ - if not datetime_filter: - datetime_filter = pendulum.now(constants.TIMEZONE.value).replace( - second=0, microsecond=0 - ) - elif isinstance(datetime_filter, str): - datetime_filter = datetime.fromisoformat(datetime_filter).replace( - second=0, microsecond=0 - ) - - query = f""" - with t as ( - select - datetime(timestamp_array) as timestamp_array - from - unnest(GENERATE_TIMESTAMP_ARRAY( - timestamp_sub('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', interval 1 day), - timestamp('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}'), - interval 1 minute) - ) as timestamp_array - where timestamp_array < '{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}' - ), - logs as ( - select - *, - timestamp_trunc(timestamp_captura, minute) as timestamp_array - from - rj-smtr.{dataset_id}.{table_id}_logs - where - data between - date(datetime_sub('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', - interval 1 day)) - and date('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}') - and - timestamp_captura between - datetime_sub('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', interval 1 day) - and '{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}' - order by timestamp_captura + return query_logs_func( + dataset_id=dataset_id, + table_id=table_id, + datetime_filter=datetime_filter, + max_recaptures=max_recaptures, + interval_minutes=interval_minutes, ) - select - case - when logs.timestamp_captura is not null then logs.timestamp_captura - else t.timestamp_array - end as timestamp_captura, - logs.erro - from - t - left join - logs - on - logs.timestamp_array = t.timestamp_array - where - logs.sucesso is not True - order by - timestamp_captura - """ - log(f"Run query to check logs:\n{query}") - results = bd.read_sql(query=query, billing_project_id=bq_project()) - if len(results) > 0: - results["timestamp_captura"] = ( - pd.to_datetime(results["timestamp_captura"]) - .dt.tz_localize(constants.TIMEZONE.value) - .to_list() - ) - log(f"Recapture data for the following {len(results)} timestamps:\n{results}") - if len(results) > max_recaptures: - message = f""" - [SPPO - Recaptures] - Encontradas {len(results)} timestamps para serem recapturadas. - Essa run processará as seguintes: - ##### - {results[:max_recaptures]} - ##### - Sobraram as seguintes para serem recapturadas na próxima run: - ##### - {results[max_recaptures:]} - ##### - """ - log_critical(message) - results = results[:max_recaptures] - return True, results["timestamp_captura"].to_list(), results["erro"].to_list() - return False, [], [] @task @@ -543,6 +474,7 @@ def create_request_params( table_id: str, dataset_id: str, timestamp: datetime, + interval_minutes: int, ) -> tuple[str, str]: """ Task to create request params @@ -552,6 +484,7 @@ def create_request_params( table_id (str): table_id on BigQuery dataset_id (str): dataset_id on BigQuery timestamp (datetime): timestamp for flow run + interval_minutes (int): interval in minutes between each capture Returns: request_params: host, database and query to request data @@ -567,7 +500,7 @@ def create_request_params( request_url = database["host"] datetime_range = get_datetime_range( - timestamp=timestamp, interval=timedelta(**extract_params["run_interval"]) + timestamp=timestamp, interval=timedelta(minutes=interval_minutes) ) request_params = { diff --git a/pipelines/rj_smtr/utils.py b/pipelines/rj_smtr/utils.py index 16ed538d3..41dc1dd02 100644 --- a/pipelines/rj_smtr/utils.py +++ b/pipelines/rj_smtr/utils.py @@ -8,6 +8,7 @@ from pathlib import Path from datetime import timedelta, datetime, date +import pendulum from typing import List, Union, Any import traceback import io @@ -449,6 +450,128 @@ def generate_execute_schedules( # pylint: disable=too-many-arguments,too-many-l return clocks +def query_logs_func( + dataset_id: str, + table_id: str, + datetime_filter=None, + max_recaptures: int = 60, + interval_minutes: int = 1, +): + """ + Queries capture logs to check for errors + + Args: + dataset_id (str): dataset_id on BigQuery + table_id (str): table_id on BigQuery + datetime_filter (pendulum.datetime.DateTime, optional): + filter passed to query. This task will query the logs table + for the last 1 day before datetime_filter + max_recaptures (int, optional): maximum number of recaptures to be done + interval_minutes (int, optional): interval in minutes between each recapture + + Returns: + lists: errors (bool), + timestamps (list of pendulum.datetime.DateTime), + previous_errors (list of previous errors) + """ + + if not datetime_filter: + datetime_filter = pendulum.now(constants.TIMEZONE.value).replace( + second=0, microsecond=0 + ) + elif isinstance(datetime_filter, str): + datetime_filter = datetime.fromisoformat(datetime_filter).replace( + second=0, microsecond=0 + ) + + datetime_filter = datetime_filter.strftime("%Y-%m-%d %H:%M:%S") + + query = f""" + WITH + t AS ( + SELECT + DATETIME(timestamp_array) AS timestamp_array + FROM + UNNEST( + GENERATE_TIMESTAMP_ARRAY( + TIMESTAMP_SUB('{datetime_filter}', INTERVAL 1 day), + TIMESTAMP('{datetime_filter}'), + INTERVAL {interval_minutes} minute) ) + AS timestamp_array + WHERE + timestamp_array < '{datetime_filter}' ), + logs_table AS ( + SELECT + SAFE_CAST(DATETIME(TIMESTAMP(timestamp_captura), + "America/Sao_Paulo") AS DATETIME) timestamp_captura, + SAFE_CAST(sucesso AS BOOLEAN) sucesso, + SAFE_CAST(erro AS STRING) erro, + SAFE_CAST(DATA AS DATE) DATA + FROM + rj-smtr-staging.{dataset_id}_staging.{table_id}_logs AS t + ), + logs AS ( + SELECT + *, + TIMESTAMP_TRUNC(timestamp_captura, minute) AS timestamp_array + FROM + logs_table + WHERE + DATA BETWEEN DATE(DATETIME_SUB('{datetime_filter}', + INTERVAL 1 day)) + AND DATE('{datetime_filter}') + AND timestamp_captura BETWEEN + DATETIME_SUB('{datetime_filter}', INTERVAL 1 day) + AND '{datetime_filter}' + ORDER BY + timestamp_captura ) + SELECT + CASE + WHEN logs.timestamp_captura IS NOT NULL THEN logs.timestamp_captura + ELSE + t.timestamp_array + END + AS timestamp_captura, + logs.erro + FROM + t + LEFT JOIN + logs + ON + logs.timestamp_array = t.timestamp_array + WHERE + logs.sucesso IS NOT TRUE + ORDER BY + timestamp_captura + """ + log(f"Run query to check logs:\n{query}") + results = bd.read_sql(query=query, billing_project_id=bq_project()) + if len(results) > 0: + results["timestamp_captura"] = ( + pd.to_datetime(results["timestamp_captura"]) + .dt.tz_localize(constants.TIMEZONE.value) + .to_list() + ) + log(f"Recapture data for the following {len(results)} timestamps:\n{results}") + if len(results) > max_recaptures: + message = f""" + [SPPO - Recaptures] + Encontradas {len(results)} timestamps para serem recapturadas. + Essa run processará as seguintes: + ##### + {results[:max_recaptures]} + ##### + Sobraram as seguintes para serem recapturadas na próxima run: + ##### + {results[max_recaptures:]} + ##### + """ + log_critical(message) + results = results[:max_recaptures] + return True, results["timestamp_captura"].to_list(), results["erro"].to_list() + return False, [], [] + + def dict_contains_keys(input_dict: dict, keys: list[str]) -> bool: """ Test if the input dict has all keys present in the list