Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Altera regra timestamp do flow generico de captura #537

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 34 additions & 23 deletions pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
default_materialization_flow,
)

from pipelines.rj_smtr.tasks import get_current_timestamp
from pipelines.rj_smtr.tasks import get_rounded_timestamp, join_dicts

from pipelines.rj_smtr.constants import constants

Expand Down Expand Up @@ -114,7 +114,7 @@
) as bilhetagem_transacao_tratamento:
# Configuração #

timestamp = get_current_timestamp()
timestamp = get_rounded_timestamp(interval_minutes=60)

rename_flow_run = rename_current_flow_run_now_time(
prefix=bilhetagem_transacao_tratamento.name + " ",
Expand All @@ -123,13 +123,17 @@

LABELS = get_current_flow_labels()

# Recapturas

# Recaptura Transação
transacao_recapture_params = join_dicts(
original_dict=constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value,
dict_to_join={"timestamp": timestamp},
)
run_recaptura_trasacao = create_flow_run(
flow_name=bilhetagem_recaptura.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
# project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
project_name="staging",
labels=LABELS,
parameters=constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value,
parameters=transacao_recapture_params,
)

wait_recaptura_trasacao = wait_for_flow_run(
Expand All @@ -139,34 +143,41 @@
raise_final_state=True,
)

runs_recaptura_auxiliar = create_flow_run.map(
flow_name=unmapped(bilhetagem_recaptura.name),
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value,
# Captura Auxiliar

auxiliar_capture_params = join_dicts(
original_dict=constants.BILHETAGEM_CAPTURE_PARAMS.value,
dict_to_join={"timestamp": timestamp},
)
runs_captura = create_flow_run.map(
flow_name=unmapped(bilhetagem_auxiliar_captura.name),
# project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
project_name=unmapped("staging"),
parameters=auxiliar_capture_params,
labels=unmapped(LABELS),
)

runs_recaptura_auxiliar.set_upstream(wait_recaptura_trasacao)

wait_recaptura_auxiliar = wait_for_flow_run.map(
runs_recaptura_auxiliar,
wait_captura = wait_for_flow_run.map(
runs_captura,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
)

# Captura
runs_captura = create_flow_run.map(
flow_name=unmapped(bilhetagem_auxiliar_captura.name),
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value,
# Recaptura Auxiliar

runs_recaptura_auxiliar = create_flow_run.map(
flow_name=unmapped(bilhetagem_recaptura.name),
# project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
project_name=unmapped("staging"),
parameters=auxiliar_capture_params,
labels=unmapped(LABELS),
)

runs_captura.set_upstream(wait_recaptura_auxiliar)
runs_recaptura_auxiliar.set_upstream(wait_captura)

wait_captura = wait_for_flow_run.map(
runs_captura,
wait_recaptura_auxiliar = wait_for_flow_run.map(
runs_recaptura_auxiliar,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
Expand All @@ -190,6 +201,6 @@
bilhetagem_transacao_tratamento.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_transacao_tratamento.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value],
)
bilhetagem_transacao_tratamento.schedule = every_hour
16 changes: 12 additions & 4 deletions pipelines/rj_smtr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,13 @@ class constants(Enum): # pylint: disable=c0103
FROM
LINHA
WHERE
DT_INCLUSAO >= '{start}'
DT_INCLUSAO BETWEEN '{start}'
AND '{end}'
""",
},
"primary_key": ["CD_LINHA"], # id column to nest data on
"interval_minutes": 60,
"truncate_hour": True,
},
{
"table_id": "grupo",
Expand All @@ -234,11 +236,13 @@ class constants(Enum): # pylint: disable=c0103
FROM
GRUPO
WHERE
DT_INCLUSAO >= '{start}'
DT_INCLUSAO BETWEEN '{start}'
AND '{end}'
""",
},
"primary_key": ["CD_GRUPO"], # id column to nest data on
"interval_minutes": 60,
"truncate_hour": True,
},
{
"table_id": "grupo_linha",
Expand All @@ -251,11 +255,13 @@ class constants(Enum): # pylint: disable=c0103
FROM
GRUPO_LINHA
WHERE
DT_INCLUSAO >= '{start}'
DT_INCLUSAO BETWEEN '{start}'
AND '{end}'
""",
},
"primary_key": ["CD_GRUPO", "CD_LINHA"],
"interval_minutes": 60,
"truncate_hour": True,
},
{
"table_id": "matriz_integracao",
Expand All @@ -268,14 +274,16 @@ class constants(Enum): # pylint: disable=c0103
FROM
matriz_integracao
WHERE
dt_inclusao >= '{start}'
dt_inclusao BETWEEN '{start}'
AND '{end}'
""",
},
"primary_key": [
"cd_versao_matriz",
"cd_integracao",
], # id column to nest data on
"interval_minutes": 60,
"truncate_hour": True,
},
]

Expand Down
10 changes: 8 additions & 2 deletions pipelines/rj_smtr/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from pipelines.rj_smtr.tasks import (
create_date_hour_partition,
create_local_partition_path,
get_current_timestamp,
get_rounded_timestamp,
parse_timestamp_to_string,
transform_raw_to_nested_structure,
create_dbt_run_vars,
Expand All @@ -52,6 +52,7 @@
table_id = Parameter("table_id", default=None)
dataset_id = Parameter("dataset_id", default=None)
partition_date_only = Parameter("partition_date_only", default=None)
timestamp = Parameter("timestamp", default=None)

# Parâmetros Captura #
extract_params = Parameter("extract_params", default=None)
Expand All @@ -70,16 +71,21 @@
checkpoint=False,
)

current_timestamp = get_rounded_timestamp(
timestamp=timestamp, interval_minutes=interval_minutes
)

with case(recapture, True):
_, recapture_timestamps, recapture_previous_errors = query_logs(
dataset_id=dataset_id,
table_id=table_id,
datetime_filter=current_timestamp,
interval_minutes=interval_minutes,
recapture_window_days=recapture_window_days,
)

with case(recapture, False):
capture_timestamp = [get_current_timestamp()]
capture_timestamp = [current_timestamp]
capture_previous_errors = task(
lambda: [None], checkpoint=False, name="assign_none_to_previous_errors"
)()
Expand Down
73 changes: 71 additions & 2 deletions pipelines/rj_smtr/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,54 @@ def create_dbt_run_vars(


@task
def get_current_timestamp(timestamp=None, truncate_minute: bool = True) -> datetime:
def get_rounded_timestamp(
timestamp: Union[str, datetime, None] = None,
interval_minutes: Union[int, None] = None,
) -> datetime:
"""
Calculate rounded timestamp for flow run.

Args:
timestamp (Union[str, datetime, None]): timestamp to be used as reference
interval_minutes (Union[int, None], optional): interval in minutes between each recapture

Returns:
datetime: timestamp for flow run
"""
if isinstance(timestamp, str):
timestamp = datetime.fromisoformat(timestamp)

if not timestamp:
timestamp = datetime.now(tz=timezone(constants.TIMEZONE.value))

timestamp = timestamp.replace(second=0, microsecond=0)

if interval_minutes:
if interval_minutes >= 60:
hours = interval_minutes / 60
interval_minutes = round(((hours) % 1) * 60)

if interval_minutes == 0:
rounded_minutes = interval_minutes
else:
rounded_minutes = (timestamp.minute // interval_minutes) * interval_minutes

timestamp = timestamp.replace(minute=rounded_minutes)

return timestamp


@task
def get_current_timestamp(
timestamp=None, truncate_minute: bool = True, truncate_hour: bool = False
) -> datetime:
"""
Get current timestamp for flow run.

Args:
timestamp: timestamp to be used as reference (optionally, it can be a string)
truncate_minute: whether to truncate the timestamp to the minute or not
truncate_hour: whether to truncate the timestamp to the hour or not

Returns:
datetime: timestamp for flow run
Expand All @@ -259,7 +300,9 @@ def get_current_timestamp(timestamp=None, truncate_minute: bool = True) -> datet
if not timestamp:
timestamp = datetime.now(tz=timezone(constants.TIMEZONE.value))
if truncate_minute:
return timestamp.replace(second=0, microsecond=0)
timestamp = timestamp.replace(second=0, microsecond=0)
if truncate_hour:
timestamp = timestamp.replace(minute=0)
return timestamp


Expand Down Expand Up @@ -385,6 +428,7 @@ def query_logs(
max_recaptures (int, optional): maximum number of recaptures to be done
interval_minutes (int, optional): interval in minutes between each recapture
recapture_window_days (int, optional): Number of days to query for erros
truncate_hour: whether to truncate the timestamp to the hour or not

Returns:
lists: errors (bool),
Expand Down Expand Up @@ -1265,3 +1309,28 @@ def unpack_mapped_results_nout2(

"""
return [r[0] for r in mapped_results], [r[1] for r in mapped_results]


@task(checkpoint=False)
def join_dicts(
original_dict: Union[dict, list[dict]], dict_to_join: dict
) -> Union[dict, list[dict]]:
"""
Task to join a dict or list of dicts with another dict

Args:
original_dict (Union[dict, list[dict]]): The input dict or list of dicts
dict_to_join (dict): The dict to be joined with original_dict

Returns:
Union[dict, list[dict]]: The joined value
"""

if isinstance(original_dict, list):
return [d | dict_to_join for d in original_dict]
elif isinstance(original_dict, dict):
return original_dict | dict_to_join
else:
raise ValueError(
f"original_dict must be dict or list, received: {type(original_dict)}"
)