Skip to content

Commit

Permalink
Merge branch 'master' into staging/smtr-add-stpl-rdo
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Oct 27, 2023
2 parents 96dafaf + fdaf644 commit a6ced1c
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 47 deletions.
16 changes: 5 additions & 11 deletions pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,6 @@

from pipelines.rj_smtr.schedules import every_hour, every_minute


GENERAL_CAPTURE_DEFAULT_PARAMS = {
"dataset_id": constants.BILHETAGEM_DATASET_ID.value,
"secret_path": constants.BILHETAGEM_SECRET_PATH.value,
"source_type": constants.BILHETAGEM_GENERAL_CAPTURE_PARAMS.value["source_type"],
}

# Flows #

# BILHETAGEM TRANSAÇÃO - CAPTURA A CADA MINUTO #
Expand All @@ -59,7 +52,7 @@

bilhetagem_transacao_captura = set_default_parameters(
flow=bilhetagem_transacao_captura,
default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS
default_parameters=constants.BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS.value
| constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value,
)

Expand All @@ -77,7 +70,7 @@

bilhetagem_tracking_captura = set_default_parameters(
flow=bilhetagem_tracking_captura,
default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS
default_parameters=constants.BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS.value
| constants.BILHETAGEM_TRACKING_CAPTURE_PARAMS.value,
)

Expand All @@ -95,7 +88,7 @@

bilhetagem_auxiliar_captura = set_default_parameters(
flow=bilhetagem_auxiliar_captura,
default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS,
default_parameters=constants.BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS.value,
)

# MATERIALIZAÇÃO - SUBFLOW DE MATERIALIZAÇÃO
Expand Down Expand Up @@ -131,7 +124,8 @@
bilhetagem_recaptura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_recaptura = set_default_parameters(
flow=bilhetagem_recaptura,
default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS | {"recapture": True},
default_parameters=constants.BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS.value
| {"recapture": True},
)

# TRATAMENTO - RODA DE HORA EM HORA, RECAPTURAS + CAPTURA AUXILIAR + MATERIALIZAÇÃO
Expand Down
6 changes: 6 additions & 0 deletions pipelines/rj_smtr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,12 @@ class constants(Enum): # pylint: disable=c0103
},
}

BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS = {
"dataset_id": BILHETAGEM_DATASET_ID,
"secret_path": BILHETAGEM_SECRET_PATH,
"source_type": BILHETAGEM_GENERAL_CAPTURE_PARAMS["source_type"],
}

# GTFS
GTFS_DATASET_ID = "br_rj_riodejaneiro_gtfs"

Expand Down
70 changes: 34 additions & 36 deletions pipelines/rj_smtr/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,43 +473,42 @@ def query_logs(

query = f"""
WITH
t AS (
SELECT
DATETIME(timestamp_array) AS timestamp_array
FROM
UNNEST(
GENERATE_TIMESTAMP_ARRAY(
TIMESTAMP_SUB('{datetime_filter}', INTERVAL {recapture_window_days} day),
TIMESTAMP('{datetime_filter}'),
INTERVAL {interval_minutes} minute) )
AS timestamp_array
WHERE
timestamp_array < '{datetime_filter}' ),
logs_table AS (
t AS (
SELECT
SAFE_CAST(DATETIME(TIMESTAMP(timestamp_captura),
"America/Sao_Paulo") AS DATETIME) timestamp_captura,
SAFE_CAST(sucesso AS BOOLEAN) sucesso,
SAFE_CAST(erro AS STRING) erro,
SAFE_CAST(DATA AS DATE) DATA
DATETIME(timestamp_array) AS timestamp_array
FROM
rj-smtr-staging.{dataset_id}_staging.{table_id}_logs AS t
),
logs AS (
SELECT
*,
TIMESTAMP_TRUNC(timestamp_captura, minute) AS timestamp_array
FROM
logs_table
UNNEST(
GENERATE_TIMESTAMP_ARRAY(
TIMESTAMP_SUB('{datetime_filter}', INTERVAL {recapture_window_days} day),
TIMESTAMP('{datetime_filter}'),
INTERVAL {interval_minutes} minute) )
AS timestamp_array
WHERE
DATA BETWEEN DATE(DATETIME_SUB('{datetime_filter}',
INTERVAL {recapture_window_days} day))
AND DATE('{datetime_filter}')
AND timestamp_captura BETWEEN
DATETIME_SUB('{datetime_filter}', INTERVAL {recapture_window_days} day)
AND '{datetime_filter}'
ORDER BY
timestamp_captura )
timestamp_array < '{datetime_filter}' ),
logs_table AS (
SELECT
SAFE_CAST(DATETIME(TIMESTAMP(timestamp_captura),
"America/Sao_Paulo") AS DATETIME) timestamp_captura,
SAFE_CAST(sucesso AS BOOLEAN) sucesso,
SAFE_CAST(erro AS STRING) erro,
SAFE_CAST(DATA AS DATE) DATA
FROM
rj-smtr-staging.{dataset_id}_staging.{table_id}_logs AS t
),
logs AS (
SELECT
*,
TIMESTAMP_TRUNC(timestamp_captura, minute) AS timestamp_array
FROM
logs_table
WHERE
DATA BETWEEN DATE(DATETIME_SUB('{datetime_filter}',
INTERVAL {recapture_window_days} day))
AND DATE('{datetime_filter}')
AND timestamp_captura BETWEEN
DATETIME_SUB('{datetime_filter}', INTERVAL {recapture_window_days} day)
AND '{datetime_filter}'
)
SELECT
CASE
WHEN logs.timestamp_captura IS NOT NULL THEN logs.timestamp_captura
Expand All @@ -526,12 +525,11 @@ def query_logs(
logs.timestamp_array = t.timestamp_array
WHERE
logs.sucesso IS NOT TRUE
ORDER BY
timestamp_captura
"""
log(f"Run query to check logs:\n{query}")
results = bd.read_sql(query=query, billing_project_id=bq_project())
if len(results) > 0:
results = results.sort_values(["timestamp_captura"])
results["timestamp_captura"] = (
pd.to_datetime(results["timestamp_captura"])
.dt.tz_localize(constants.TIMEZONE.value)
Expand Down

0 comments on commit a6ced1c

Please sign in to comment.