Skip to content

Commit

Permalink
generaliza função query logs
Browse files Browse the repository at this point in the history
  • Loading branch information
pixuimpou committed Oct 17, 2023
1 parent 0bf3ade commit 35c80d4
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 83 deletions.
99 changes: 16 additions & 83 deletions pipelines/rj_smtr/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
bq_project,
get_table_min_max_value,
get_last_run_timestamp,
log_critical,
data_info_str,
dict_contains_keys,
get_raw_data_api,
Expand All @@ -37,6 +36,7 @@
read_raw_data,
save_treated_local_func,
save_raw_local_func,
query_logs_func,
)
from pipelines.utils.execute_dbt_model.utils import get_dbt_client
from pipelines.utils.utils import log, get_redis_client, get_vault_secret
Expand Down Expand Up @@ -370,6 +370,7 @@ def query_logs(
table_id: str,
datetime_filter=None,
max_recaptures: int = 60,
interval_minutes: int = 1,
):
"""
Queries capture logs to check for errors
Expand All @@ -380,92 +381,22 @@ def query_logs(
datetime_filter (pendulum.datetime.DateTime, optional):
filter passed to query. This task will query the logs table
for the last 1 day before datetime_filter
max_recaptures (int, optional): maximum number of recaptures to be done
interval_minutes (int, optional): interval in minutes between each recapture
Returns:
list: containing timestamps for which the capture failed
lists: errors (bool),
timestamps (list of pendulum.datetime.DateTime),
previous_errors (list of previous errors)
"""

if not datetime_filter:
datetime_filter = pendulum.now(constants.TIMEZONE.value).replace(
second=0, microsecond=0
)
elif isinstance(datetime_filter, str):
datetime_filter = datetime.fromisoformat(datetime_filter).replace(
second=0, microsecond=0
)

query = f"""
with t as (
select
datetime(timestamp_array) as timestamp_array
from
unnest(GENERATE_TIMESTAMP_ARRAY(
timestamp_sub('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', interval 1 day),
timestamp('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}'),
interval 1 minute)
) as timestamp_array
where timestamp_array < '{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}'
),
logs as (
select
*,
timestamp_trunc(timestamp_captura, minute) as timestamp_array
from
rj-smtr.{dataset_id}.{table_id}_logs
where
data between
date(datetime_sub('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}',
interval 1 day))
and date('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}')
and
timestamp_captura between
datetime_sub('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', interval 1 day)
and '{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}'
order by timestamp_captura
return query_logs_func(
dataset_id=dataset_id,
table_id=table_id,
datetime_filter=datetime_filter,
max_recaptures=max_recaptures,
interval_minutes=interval_minutes,
)
select
case
when logs.timestamp_captura is not null then logs.timestamp_captura
else t.timestamp_array
end as timestamp_captura,
logs.erro
from
t
left join
logs
on
logs.timestamp_array = t.timestamp_array
where
logs.sucesso is not True
order by
timestamp_captura
"""
log(f"Run query to check logs:\n{query}")
results = bd.read_sql(query=query, billing_project_id=bq_project())
if len(results) > 0:
results["timestamp_captura"] = (
pd.to_datetime(results["timestamp_captura"])
.dt.tz_localize(constants.TIMEZONE.value)
.to_list()
)
log(f"Recapture data for the following {len(results)} timestamps:\n{results}")
if len(results) > max_recaptures:
message = f"""
[SPPO - Recaptures]
Encontradas {len(results)} timestamps para serem recapturadas.
Essa run processará as seguintes:
#####
{results[:max_recaptures]}
#####
Sobraram as seguintes para serem recapturadas na próxima run:
#####
{results[max_recaptures:]}
#####
"""
log_critical(message)
results = results[:max_recaptures]
return True, results["timestamp_captura"].to_list(), results["erro"].to_list()
return False, [], []


@task
Expand Down Expand Up @@ -543,6 +474,7 @@ def create_request_params(
table_id: str,
dataset_id: str,
timestamp: datetime,
interval_minutes: int,
) -> tuple[str, str]:
"""
Task to create request params
Expand All @@ -552,6 +484,7 @@ def create_request_params(
table_id (str): table_id on BigQuery
dataset_id (str): dataset_id on BigQuery
timestamp (datetime): timestamp for flow run
interval_minutes (int): interval in minutes between each capture
Returns:
request_params: host, database and query to request data
Expand All @@ -567,7 +500,7 @@ def create_request_params(
request_url = database["host"]

datetime_range = get_datetime_range(
timestamp=timestamp, interval=timedelta(**extract_params["run_interval"])
timestamp=timestamp, interval=timedelta(minutes=interval_minutes)
)

request_params = {
Expand Down
123 changes: 123 additions & 0 deletions pipelines/rj_smtr/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pathlib import Path

from datetime import timedelta, datetime, date
import pendulum
from typing import List, Union, Any
import traceback
import io
Expand Down Expand Up @@ -449,6 +450,128 @@ def generate_execute_schedules( # pylint: disable=too-many-arguments,too-many-l
return clocks


def query_logs_func(
dataset_id: str,
table_id: str,
datetime_filter=None,
max_recaptures: int = 60,
interval_minutes: int = 1,
):
"""
Queries capture logs to check for errors
Args:
dataset_id (str): dataset_id on BigQuery
table_id (str): table_id on BigQuery
datetime_filter (pendulum.datetime.DateTime, optional):
filter passed to query. This task will query the logs table
for the last 1 day before datetime_filter
max_recaptures (int, optional): maximum number of recaptures to be done
interval_minutes (int, optional): interval in minutes between each recapture
Returns:
lists: errors (bool),
timestamps (list of pendulum.datetime.DateTime),
previous_errors (list of previous errors)
"""

if not datetime_filter:
datetime_filter = pendulum.now(constants.TIMEZONE.value).replace(
second=0, microsecond=0
)
elif isinstance(datetime_filter, str):
datetime_filter = datetime.fromisoformat(datetime_filter).replace(
second=0, microsecond=0
)

datetime_filter = datetime_filter.strftime("%Y-%m-%d %H:%M:%S")

query = f"""
WITH
t AS (
SELECT
DATETIME(timestamp_array) AS timestamp_array
FROM
UNNEST(
GENERATE_TIMESTAMP_ARRAY(
TIMESTAMP_SUB('{datetime_filter}', INTERVAL 1 day),
TIMESTAMP('{datetime_filter}'),
INTERVAL {interval_minutes} minute) )
AS timestamp_array
WHERE
timestamp_array < '{datetime_filter}' ),
logs_table AS (
SELECT
SAFE_CAST(DATETIME(TIMESTAMP(timestamp_captura),
"America/Sao_Paulo") AS DATETIME) timestamp_captura,
SAFE_CAST(sucesso AS BOOLEAN) sucesso,
SAFE_CAST(erro AS STRING) erro,
SAFE_CAST(DATA AS DATE) DATA
FROM
rj-smtr-staging.{dataset_id}_staging.{table_id}_logs AS t
),
logs AS (
SELECT
*,
TIMESTAMP_TRUNC(timestamp_captura, minute) AS timestamp_array
FROM
logs_table
WHERE
DATA BETWEEN DATE(DATETIME_SUB('{datetime_filter}',
INTERVAL 1 day))
AND DATE('{datetime_filter}')
AND timestamp_captura BETWEEN
DATETIME_SUB('{datetime_filter}', INTERVAL 1 day)
AND '{datetime_filter}'
ORDER BY
timestamp_captura )
SELECT
CASE
WHEN logs.timestamp_captura IS NOT NULL THEN logs.timestamp_captura
ELSE
t.timestamp_array
END
AS timestamp_captura,
logs.erro
FROM
t
LEFT JOIN
logs
ON
logs.timestamp_array = t.timestamp_array
WHERE
logs.sucesso IS NOT TRUE
ORDER BY
timestamp_captura
"""
log(f"Run query to check logs:\n{query}")
results = bd.read_sql(query=query, billing_project_id=bq_project())
if len(results) > 0:
results["timestamp_captura"] = (
pd.to_datetime(results["timestamp_captura"])
.dt.tz_localize(constants.TIMEZONE.value)
.to_list()
)
log(f"Recapture data for the following {len(results)} timestamps:\n{results}")
if len(results) > max_recaptures:
message = f"""
[SPPO - Recaptures]
Encontradas {len(results)} timestamps para serem recapturadas.
Essa run processará as seguintes:
#####
{results[:max_recaptures]}
#####
Sobraram as seguintes para serem recapturadas na próxima run:
#####
{results[max_recaptures:]}
#####
"""
log_critical(message)
results = results[:max_recaptures]
return True, results["timestamp_captura"].to_list(), results["erro"].to_list()
return False, [], []


def dict_contains_keys(input_dict: dict, keys: list[str]) -> bool:
"""
Test if the input dict has all keys present in the list
Expand Down

0 comments on commit 35c80d4

Please sign in to comment.