Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][gps_sppo] Altera parâmetros da pipeline de recaptura #527

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,13 @@


with Flow(
"SMTR - GPS SPPO Recapturas", code_owners=["caio", "fernanda", "boris", "rodrigo"]
"SMTR - GPS SPPO Recapturas",
code_owners=["rodrigo"], # "caio", "fernanda", "boris"]
) as recaptura:
version = Parameter("version", default=2)
datetime_filter = Parameter("datetime_filter", default=None)
previous_days = Parameter("previous_days", default=1)
max_recaptures = Parameter("max_recaptures", default=1440)
materialize = Parameter("materialize", default=True)
# SETUP #
LABELS = get_current_flow_labels()
Expand All @@ -294,6 +297,8 @@
dataset_id=constants.GPS_SPPO_RAW_DATASET_ID.value,
table_id=constants.GPS_SPPO_RAW_TABLE_ID.value,
datetime_filter=datetime_filter,
previous_days=previous_days,
max_recaptures=max_recaptures,
)

rename_flow_run = rename_current_flow_run_now_time(
Expand Down
18 changes: 10 additions & 8 deletions pipelines/rj_smtr/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from typing import Dict, List, Union, Iterable
import io

from basedosdados import Storage, Table
import basedosdados as bd
from dbt_client import DbtClient
import pandas as pd
Expand Down Expand Up @@ -271,7 +270,8 @@ def query_logs(
dataset_id: str,
table_id: str,
datetime_filter=None,
max_recaptures: int = 60,
max_recaptures: int = 1440,
previous_days: int = 1,
):
"""
Queries capture logs to check for errors
Expand Down Expand Up @@ -302,7 +302,8 @@ def query_logs(
datetime(timestamp_array) as timestamp_array
from
unnest(GENERATE_TIMESTAMP_ARRAY(
timestamp_sub('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', interval 1 day),
timestamp_sub('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}',
interval {previous_days} day),
timestamp('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}'),
interval 1 minute)
) as timestamp_array
Expand All @@ -317,11 +318,12 @@ def query_logs(
where
data between
date(datetime_sub('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}',
interval 1 day))
interval {previous_days} day))
and date('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}')
and
timestamp_captura between
datetime_sub('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}', interval 1 day)
datetime_sub('{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}',
interval {previous_days} day)
and '{datetime_filter.strftime('%Y-%m-%d %H:%M:%S')}'
order by timestamp_captura
)
Expand Down Expand Up @@ -599,7 +601,7 @@ def bq_upload(
try:
# Upload raw to staging
if raw_filepath:
st_obj = Storage(table_id=table_id, dataset_id=dataset_id)
st_obj = bd.Storage(table_id=table_id, dataset_id=dataset_id)
log(
f"""Uploading raw file to bucket {st_obj.bucket_name} at
{st_obj.bucket_name}/{dataset_id}/{table_id}"""
Expand Down Expand Up @@ -745,7 +747,7 @@ def upload_raw_data_to_gcs(
"""
if error is None:
try:
st_obj = Storage(table_id=table_id, dataset_id=dataset_id)
st_obj = bd.Storage(table_id=table_id, dataset_id=dataset_id)
log(
f"""Uploading raw file to bucket {st_obj.bucket_name} at
{st_obj.bucket_name}/{dataset_id}/{table_id}"""
Expand Down Expand Up @@ -857,7 +859,7 @@ def get_materialization_date_range( # pylint: disable=R0913
# if there's no timestamp set on redis, get max timestamp on source table
if last_run is None:
log("Failed to fetch key from Redis...\n Querying tables for last suceeded run")
if Table(dataset_id=dataset_id, table_id=table_id).table_exists("prod"):
if bd.Table(dataset_id=dataset_id, table_id=table_id).table_exists("prod"):
last_run = get_table_min_max_value(
query_project_id=bq_project(),
dataset_id=dataset_id,
Expand Down