Skip to content

Commit

Permalink
ajuste na logica de recaptura bilhetagem auxiliar
Browse files Browse the repository at this point in the history
  • Loading branch information
pixuimpou committed Oct 23, 2023
1 parent ccddeea commit 830c52f
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 27 deletions.
57 changes: 34 additions & 23 deletions pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
default_materialization_flow,
)

from pipelines.rj_smtr.tasks import get_current_timestamp
from pipelines.rj_smtr.tasks import get_rounded_timestamp, join_dicts

from pipelines.rj_smtr.constants import constants

Expand Down Expand Up @@ -114,7 +114,7 @@
) as bilhetagem_transacao_tratamento:
# Configuração #

timestamp = get_current_timestamp()
timestamp = get_rounded_timestamp(interval_minutes=60)

rename_flow_run = rename_current_flow_run_now_time(
prefix=bilhetagem_transacao_tratamento.name + " ",
Expand All @@ -123,13 +123,17 @@

LABELS = get_current_flow_labels()

# Recapturas

# Recaptura Transação
transacao_recapture_params = join_dicts(
original_dict=constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value,
dict_to_join={"timestamp": timestamp},
)
run_recaptura_trasacao = create_flow_run(
flow_name=bilhetagem_recaptura.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
# project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
project_name="staging",
labels=LABELS,
parameters=constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value,
parameters=transacao_recapture_params,
)

wait_recaptura_trasacao = wait_for_flow_run(
Expand All @@ -139,34 +143,41 @@
raise_final_state=True,
)

runs_recaptura_auxiliar = create_flow_run.map(
flow_name=unmapped(bilhetagem_recaptura.name),
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value,
# Captura Auxiliar

auxiliar_capture_params = join_dicts(
original_dict=constants.BILHETAGEM_CAPTURE_PARAMS.value,
dict_to_join={"timestamp": timestamp},
)
runs_captura = create_flow_run.map(
flow_name=unmapped(bilhetagem_auxiliar_captura.name),
# project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
project_name=unmapped("staging"),
parameters=auxiliar_capture_params,
labels=unmapped(LABELS),
)

runs_recaptura_auxiliar.set_upstream(wait_recaptura_trasacao)

wait_recaptura_auxiliar = wait_for_flow_run.map(
runs_recaptura_auxiliar,
wait_captura = wait_for_flow_run.map(
runs_captura,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
)

# Captura
runs_captura = create_flow_run.map(
flow_name=unmapped(bilhetagem_auxiliar_captura.name),
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value,
# Recaptura Auxiliar

runs_recaptura_auxiliar = create_flow_run.map(
flow_name=unmapped(bilhetagem_recaptura.name),
# project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
project_name=unmapped("staging"),
parameters=auxiliar_capture_params,
labels=unmapped(LABELS),
)

runs_captura.set_upstream(wait_recaptura_auxiliar)
runs_recaptura_auxiliar.set_upstream(wait_captura)

wait_captura = wait_for_flow_run.map(
runs_captura,
wait_recaptura_auxiliar = wait_for_flow_run.map(
runs_recaptura_auxiliar,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
Expand All @@ -190,6 +201,6 @@
bilhetagem_transacao_tratamento.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_transacao_tratamento.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value],
)
bilhetagem_transacao_tratamento.schedule = every_hour
4 changes: 4 additions & 0 deletions pipelines/rj_smtr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ class constants(Enum): # pylint: disable=c0103
},
"primary_key": ["CD_LINHA"], # id column to nest data on
"interval_minutes": 60,
"truncate_hour": True,
},
{
"table_id": "grupo",
Expand All @@ -241,6 +242,7 @@ class constants(Enum): # pylint: disable=c0103
},
"primary_key": ["CD_GRUPO"], # id column to nest data on
"interval_minutes": 60,
"truncate_hour": True,
},
{
"table_id": "grupo_linha",
Expand All @@ -259,6 +261,7 @@ class constants(Enum): # pylint: disable=c0103
},
"primary_key": ["CD_GRUPO", "CD_LINHA"],
"interval_minutes": 60,
"truncate_hour": True,
},
{
"table_id": "matriz_integracao",
Expand All @@ -280,6 +283,7 @@ class constants(Enum): # pylint: disable=c0103
"cd_integracao",
], # id column to nest data on
"interval_minutes": 60,
"truncate_hour": True,
},
]

Expand Down
10 changes: 8 additions & 2 deletions pipelines/rj_smtr/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from pipelines.rj_smtr.tasks import (
create_date_hour_partition,
create_local_partition_path,
get_current_timestamp,
get_rounded_timestamp,
parse_timestamp_to_string,
transform_raw_to_nested_structure,
create_dbt_run_vars,
Expand All @@ -52,6 +52,7 @@
table_id = Parameter("table_id", default=None)
dataset_id = Parameter("dataset_id", default=None)
partition_date_only = Parameter("partition_date_only", default=None)
timestamp = Parameter("timestamp", default=None)

# Parâmetros Captura #
extract_params = Parameter("extract_params", default=None)
Expand All @@ -70,16 +71,21 @@
checkpoint=False,
)

current_timestamp = get_rounded_timestamp(
timestamp=timestamp, interval_minutes=interval_minutes
)

with case(recapture, True):
_, recapture_timestamps, recapture_previous_errors = query_logs(
dataset_id=dataset_id,
table_id=table_id,
datetime_filter=current_timestamp,
interval_minutes=interval_minutes,
recapture_window_days=recapture_window_days,
)

with case(recapture, False):
capture_timestamp = [get_current_timestamp()]
capture_timestamp = [current_timestamp]
capture_previous_errors = task(
lambda: [None], checkpoint=False, name="assign_none_to_previous_errors"
)()
Expand Down
73 changes: 71 additions & 2 deletions pipelines/rj_smtr/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,54 @@ def create_dbt_run_vars(


@task
def get_current_timestamp(timestamp=None, truncate_minute: bool = True) -> datetime:
def get_rounded_timestamp(
timestamp: Union[str, datetime, None] = None,
interval_minutes: Union[int, None] = None,
) -> datetime:
"""
Calculate rounded timestamp for flow run.
Args:
timestamp (Union[str, datetime, None]): timestamp to be used as reference
interval_minutes (Union[int, None], optional): interval in minutes between each recapture
Returns:
datetime: timestamp for flow run
"""
if isinstance(timestamp, str):
timestamp = datetime.fromisoformat(timestamp)

if not timestamp:
timestamp = datetime.now(tz=timezone(constants.TIMEZONE.value))

timestamp = timestamp.replace(second=0, microsecond=0)

if interval_minutes:
if interval_minutes >= 60:
hours = interval_minutes / 60
interval_minutes = round(((hours) % 1) * 60)

if interval_minutes == 0:
rounded_minutes = interval_minutes
else:
rounded_minutes = (timestamp.minute // interval_minutes) * interval_minutes

timestamp = timestamp.replace(minute=rounded_minutes)

return timestamp


@task
def get_current_timestamp(
timestamp=None, truncate_minute: bool = True, truncate_hour: bool = False
) -> datetime:
"""
Get current timestamp for flow run.
Args:
timestamp: timestamp to be used as reference (optionally, it can be a string)
truncate_minute: whether to truncate the timestamp to the minute or not
truncate_hour: whether to truncate the timestamp to the hour or not
Returns:
datetime: timestamp for flow run
Expand All @@ -259,7 +300,9 @@ def get_current_timestamp(timestamp=None, truncate_minute: bool = True) -> datet
if not timestamp:
timestamp = datetime.now(tz=timezone(constants.TIMEZONE.value))
if truncate_minute:
return timestamp.replace(second=0, microsecond=0)
timestamp = timestamp.replace(second=0, microsecond=0)
if truncate_hour:
timestamp = timestamp.replace(minute=0)
return timestamp


Expand Down Expand Up @@ -385,6 +428,7 @@ def query_logs(
max_recaptures (int, optional): maximum number of recaptures to be done
interval_minutes (int, optional): interval in minutes between each recapture
recapture_window_days (int, optional): Number of days to query for erros
truncate_hour: whether to truncate the timestamp to the hour or not
Returns:
lists: errors (bool),
Expand Down Expand Up @@ -1265,3 +1309,28 @@ def unpack_mapped_results_nout2(
"""
return [r[0] for r in mapped_results], [r[1] for r in mapped_results]


@task(checkpoint=False)
def join_dicts(
original_dict: Union[dict, list[dict]], dict_to_join: dict
) -> Union[dict, list[dict]]:
"""
Task to join a dict or list of dicts with another dict
Args:
original_dict (Union[dict, list[dict]]): The input dict or list of dicts
dict_to_join (dict): The dict to be joined with original_dict
Returns:
Union[dict, list[dict]]: The joined value
"""

if isinstance(original_dict, list):
return [d | dict_to_join for d in original_dict]
elif isinstance(original_dict, dict):
return original_dict | dict_to_join
else:
raise ValueError(
f"original_dict must be dict or list, received: {type(original_dict)}"
)

0 comments on commit 830c52f

Please sign in to comment.