Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cria flow de captura de dados de tracking do Jae #532

Merged
merged 168 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
168 commits
Select commit Hold shift + click to select a range
b6089fa
remove task de particao nao usada
fernandascovino Sep 25, 2023
dc197cc
unifica tasks de particao de data e hora
fernandascovino Sep 25, 2023
66e84a1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 25, 2023
7cb436b
corrige condicional
fernandascovino Sep 25, 2023
8903027
Merge branch 'master' into hotfix-smtr-partition-date-only
mergify[bot] Sep 26, 2023
588fe7d
change capture flow
pixuimpou Sep 26, 2023
da92c19
Merge branch 'master' into hotfix-smtr-partition-date-only
mergify[bot] Sep 26, 2023
97746e1
change generic capture flow
pixuimpou Sep 26, 2023
7a11896
Merge branch 'master' into staging/flow-captura-generico
fernandascovino Sep 26, 2023
b62473c
Merge branch 'hotfix-smtr-partition-date-only' into staging/flow-capt…
fernandascovino Sep 26, 2023
6f12477
atualiza esquema do flow padrao
fernandascovino Sep 26, 2023
0c3df1b
change default capture flow structure
pixuimpou Sep 27, 2023
f6ca7ab
change generic capture flow
pixuimpou Sep 27, 2023
fa17be2
adjust constant structure
pixuimpou Sep 27, 2023
bdc3881
change bilhetagem to new capture flow structure
pixuimpou Sep 27, 2023
fc61c47
fix get_storage_blob function
pixuimpou Sep 27, 2023
0fc26cb
fix get_storage_blob call
pixuimpou Sep 27, 2023
634df85
organize constants order
pixuimpou Sep 27, 2023
bda52aa
fix get_raw_from_sources function call
pixuimpou Sep 27, 2023
b2548d6
change transform_raw_to_json to read_raw_data
pixuimpou Sep 27, 2023
307863a
transform transform_raw_data_to_json to read_raw_data
pixuimpou Sep 27, 2023
7f2c1e3
fix nout task parameter
pixuimpou Sep 27, 2023
51977c1
fix timedelta instantiation
pixuimpou Sep 27, 2023
8ef0b5d
set upstream tasks
pixuimpou Sep 27, 2023
4f21f0a
declare raw_filepath
pixuimpou Sep 27, 2023
11b9735
update docstrings
eng-rodrigocunha Sep 27, 2023
f484b88
adjust get_raw_from_sources return
pixuimpou Sep 27, 2023
50626a9
Merge branch 'staging/flow-captura-generico' of https://github.com/pr…
pixuimpou Sep 27, 2023
2df4318
fix errors
pixuimpou Sep 27, 2023
df6525a
change agent label to dev
pixuimpou Sep 27, 2023
2983b68
refactore source values
pixuimpou Sep 28, 2023
2c78b09
update constants
eng-rodrigocunha Sep 28, 2023
1f3c2fc
update agent
eng-rodrigocunha Sep 28, 2023
702e70d
update schedule params
eng-rodrigocunha Sep 28, 2023
b5712d2
update interval
eng-rodrigocunha Sep 28, 2023
e3df22c
fix get_datetime_range interval
eng-rodrigocunha Sep 28, 2023
6ed06da
remove order by from queries
eng-rodrigocunha Sep 28, 2023
822c59f
fix get_raw_data_api
eng-rodrigocunha Sep 28, 2023
f3ddf24
Merge branch 'master' into staging/flow-captura-generico
mergify[bot] Sep 28, 2023
14dd234
Merge branch 'master' into staging/flow-captura-generico
mergify[bot] Sep 28, 2023
c58ea96
change json read function
pixuimpou Sep 28, 2023
045a423
update read_raw_data
lingsv Sep 28, 2023
d2d188f
update save_raw_local_func
lingsv Sep 28, 2023
b7c4e2f
log error
pixuimpou Sep 28, 2023
2bedf89
change raw api extraction for json
pixuimpou Sep 28, 2023
20b48df
change read json function
pixuimpou Sep 28, 2023
42c6ac0
print log traceback
pixuimpou Sep 28, 2023
2527604
skip pre treatment if empty df
pixuimpou Sep 29, 2023
0f907b9
skip save staging if dataframe is empty / save raw
pixuimpou Sep 29, 2023
ba1dad2
remove skip upload if empty dataframe
pixuimpou Sep 29, 2023
4c3d1cf
update docstring and returned values
pixuimpou Sep 29, 2023
39e8606
reorganize task order
pixuimpou Sep 29, 2023
465ee52
fix tuple
pixuimpou Sep 29, 2023
67a1056
change zip logic
pixuimpou Sep 29, 2023
3f5f34c
remove skip
pixuimpou Sep 29, 2023
7860a4b
create gtfs zip constant
pixuimpou Sep 29, 2023
2d7c9cb
add gtfs zip file name
pixuimpou Sep 29, 2023
bfa6273
add csv to save raw / change filetype logic
pixuimpou Sep 29, 2023
524cd07
remove comments
pixuimpou Sep 29, 2023
3477a2c
fix csv_args default value
pixuimpou Sep 29, 2023
16e61c8
change docstring get raw api
pixuimpou Sep 29, 2023
4bdaa4f
change raw data gcs docstring
pixuimpou Sep 29, 2023
e3b7c14
remove commented task
pixuimpou Sep 29, 2023
0935cbd
change quadro primary key to list
pixuimpou Sep 29, 2023
e5bad98
update GTFS constants
lingsv Sep 29, 2023
d4230bb
change upload folder structure
pixuimpou Sep 29, 2023
759fb00
Merge branch 'staging/flow-captura-generico' of https://github.com/pr…
pixuimpou Sep 29, 2023
c5c1028
Merge branch 'master' of https://github.com/prefeitura-rio/pipelines …
pixuimpou Sep 29, 2023
8074135
Merge branch 'master' into staging/flow-captura-generico
fernandascovino Sep 29, 2023
7c43d1d
undo silenciamento de falha de notificação
fernandascovino Sep 29, 2023
5c189e3
Merge branch 'staging/flow-captura-generico' of https://github.com/pr…
pixuimpou Sep 29, 2023
089e933
adicionar partition date only na transacao
pixuimpou Sep 29, 2023
350fbdf
Merge branch 'master' into staging/flow-captura-generico
fernandascovino Sep 29, 2023
685aae5
remove parametros de testes (gtfs)
fernandascovino Sep 29, 2023
cd5048e
Update pipelines/rj_smtr/constants.py
eng-rodrigocunha Sep 29, 2023
d17d161
corrige encadeamento de erros no flow
fernandascovino Sep 29, 2023
02b948a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 29, 2023
fac7821
remove header treatment
pixuimpou Oct 2, 2023
e291e51
mudar agent dev para prd
pixuimpou Oct 2, 2023
e57d457
mudar agent de dev para prd
pixuimpou Oct 2, 2023
3767a56
ajustar retorno das funcoes
pixuimpou Oct 2, 2023
6564ea6
Atualiza documentação
eng-rodrigocunha Oct 2, 2023
19bb0be
adicionar retorno em get_upload_storage_blob
pixuimpou Oct 2, 2023
d5168c7
Merge branch 'staging/flow-captura-generico' of https://github.com/pr…
pixuimpou Oct 2, 2023
bc87f44
Atualiza documentação
eng-rodrigocunha Oct 2, 2023
185d695
Atualiza string
eng-rodrigocunha Oct 2, 2023
4a975d5
adiciona recaptura no flow generico
pixuimpou Oct 10, 2023
eecaf6c
Merge branch 'master' of https://github.com/prefeitura-rio/pipelines …
pixuimpou Oct 10, 2023
33a23e5
alterar labels para dev
pixuimpou Oct 11, 2023
0eb4e92
adicionar logica de recaptura
pixuimpou Oct 11, 2023
ecc67d1
criar conexão com banco de dados
pixuimpou Oct 11, 2023
2a88286
criar conexão com banco de dados
pixuimpou Oct 11, 2023
ae1c882
cria função para map de multiplos retornos
pixuimpou Oct 11, 2023
22bb4ce
remover unmapped dos filepaths
pixuimpou Oct 11, 2023
8cb4404
log para debbug
pixuimpou Oct 11, 2023
e8d9fb7
retirar unmapped das partições
pixuimpou Oct 11, 2023
cb7e7e5
adicionar unmapped no parametro recapture
pixuimpou Oct 11, 2023
59789ab
adicionar psycopg2
pixuimpou Oct 16, 2023
60b1a93
comentários dos parametros
pixuimpou Oct 16, 2023
ff77973
adicionar conexão com postgresql
pixuimpou Oct 16, 2023
b084387
mudar bilhetagem para extrair do db
pixuimpou Oct 16, 2023
032763c
padronizar nomenclatura dos argumentos
pixuimpou Oct 16, 2023
ffb2051
mudar label schedule para dev
pixuimpou Oct 16, 2023
10911c6
corrigir constante db bilhetagem postgresql
pixuimpou Oct 16, 2023
7e51e69
alterar nomeação para runs de recaptura
pixuimpou Oct 16, 2023
e256f12
ajuste connector
pixuimpou Oct 16, 2023
c67a93e
alterar IP para DNS
pixuimpou Oct 17, 2023
a5d342c
Serialize datetime objects / read sql with pandas
pixuimpou Oct 17, 2023
16ffff3
mudar logica do nome da run
pixuimpou Oct 17, 2023
55fbe34
cria recaptura bilhetagem
pixuimpou Oct 17, 2023
db6e6d9
mudar host para IP / adiciona interval_minutes
pixuimpou Oct 17, 2023
d115126
adiciona parametro interval minutes
pixuimpou Oct 17, 2023
97c865a
remove linha comentada
pixuimpou Oct 17, 2023
0bf3ade
remove arquivo de schedules da bilhetagem
pixuimpou Oct 17, 2023
35c80d4
generaliza função query logs
pixuimpou Oct 17, 2023
a59e353
ajuste remove schedule personalizado
pixuimpou Oct 17, 2023
2616565
unmap interval_minutes
pixuimpou Oct 17, 2023
0696626
alteração de pasta de gravação para teste
pixuimpou Oct 17, 2023
ee0c440
teste retirar timezone
pixuimpou Oct 18, 2023
a8bb7f1
mudar timezone
pixuimpou Oct 18, 2023
d956a53
corrigir logica de recaptura
pixuimpou Oct 18, 2023
2261952
adicionar possibilidade de recapturar mais dias
pixuimpou Oct 18, 2023
b8ac6b8
ajustar recapture_window_days default
pixuimpou Oct 18, 2023
7f0c309
adicionae recapture_window na task query_logs
pixuimpou Oct 18, 2023
ae17746
merge previous_errors
pixuimpou Oct 18, 2023
b172a63
remover log de teste
pixuimpou Oct 18, 2023
0bfe9cf
ajustar log recaptura
pixuimpou Oct 18, 2023
7ca3764
adicionar recaptura auxiliar
pixuimpou Oct 18, 2023
c5f369f
criar parametros recaptura tabelas auxiliares
pixuimpou Oct 18, 2023
e75e7a6
comentar materializacao
pixuimpou Oct 18, 2023
6b6d0cb
teste log
pixuimpou Oct 18, 2023
ec23cf6
muda logica recaptura bilhetagem
pixuimpou Oct 18, 2023
1644b72
unmapped upstream tasks
pixuimpou Oct 18, 2023
a33a4b8
mudar forma de upstream
pixuimpou Oct 18, 2023
e47ff2d
remover alterações de teste
pixuimpou Oct 19, 2023
ba730a4
mudar agent para prd
pixuimpou Oct 19, 2023
4421043
corrigir project_name
pixuimpou Oct 19, 2023
f1a3bbd
passar tirar query_logs_func
pixuimpou Oct 19, 2023
517a9a2
corrigir project_name
pixuimpou Oct 19, 2023
df6ee96
remover comentários
pixuimpou Oct 19, 2023
7b2e1df
remover query_logs_func
pixuimpou Oct 19, 2023
5f30384
aumentar max_recaptures
pixuimpou Oct 19, 2023
431f004
adiciona extracao tracking
pixuimpou Oct 19, 2023
f855caa
Merge branch 'master' into staging/flow-captura-generico
pixuimpou Oct 19, 2023
e7ca572
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 19, 2023
dae77e8
muda agent para dev
pixuimpou Oct 19, 2023
c722638
Merge branch 'staging/flow-captura-generico' of https://github.com/pr…
pixuimpou Oct 19, 2023
5fb96e6
corrige constante
pixuimpou Oct 19, 2023
7c3de1a
formatar constante database
pixuimpou Oct 19, 2023
de24539
altera nome do flow
pixuimpou Oct 19, 2023
cdfffd2
Merge branch 'master' into staging/flow-captura-generico
mergify[bot] Oct 20, 2023
087972b
Merge branch 'master' into staging/flow-captura-generico
mergify[bot] Oct 20, 2023
ab5f6aa
Merge branch 'master' of https://github.com/prefeitura-rio/pipelines …
pixuimpou Oct 23, 2023
ccddeea
alterar queries bilhetagem auxiliar
pixuimpou Oct 23, 2023
830c52f
ajuste na logica de recaptura bilhetagem auxiliar
pixuimpou Oct 23, 2023
89c4dff
Merge branch 'hotfix/flow-captura-bilhetagem' of https://github.com/p…
pixuimpou Oct 23, 2023
5ffc2cd
Merge branch 'staging/flow-captura-generico' of https://github.com/pr…
pixuimpou Oct 23, 2023
d4e16db
remover parametro timestamp
pixuimpou Oct 23, 2023
af89e2f
remove truncate hour
pixuimpou Oct 23, 2023
ddffd6c
mudar agent para prd
pixuimpou Oct 23, 2023
8d48597
Merge branch 'master' into staging/flow-captura-generico
mergify[bot] Oct 23, 2023
a4660d1
mudar project name
pixuimpou Oct 23, 2023
49b3618
Merge branch 'staging/flow-captura-generico' of https://github.com/pr…
pixuimpou Oct 23, 2023
5f3596b
criar constante interval
pixuimpou Oct 23, 2023
9a05708
criar recaptura gps
pixuimpou Oct 23, 2023
b5a403d
corrigir docstring
pixuimpou Oct 23, 2023
165f9ab
alterar comentario recaptura
pixuimpou Oct 23, 2023
e8711b6
voltar task get_current_timestamp
pixuimpou Oct 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 67 additions & 15 deletions pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
default_materialization_flow,
)

from pipelines.rj_smtr.tasks import get_current_timestamp
from pipelines.rj_smtr.tasks import get_rounded_timestamp

from pipelines.rj_smtr.constants import constants

Expand Down Expand Up @@ -63,6 +63,23 @@

bilhetagem_transacao_captura.schedule = every_minute

# BILHETAGEM GPS

bilhetagem_tracking_captura = deepcopy(default_capture_flow)
bilhetagem_tracking_captura.name = "SMTR: Bilhetagem GPS Validador - Captura"
bilhetagem_tracking_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_tracking_captura.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)

bilhetagem_tracking_captura = set_default_parameters(
flow=bilhetagem_tracking_captura,
default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS
| constants.BILHETAGEM_TRACKING_CAPTURE_PARAMS.value,
)

bilhetagem_tracking_captura.schedule = every_minute

# BILHETAGEM AUXILIAR - SUBFLOW PARA RODAR ANTES DE CADA MATERIALIZAÇÃO #

Expand Down Expand Up @@ -114,7 +131,9 @@
) as bilhetagem_transacao_tratamento:
# Configuração #

timestamp = get_current_timestamp()
timestamp = get_rounded_timestamp(
interval_minutes=constants.BILHETAGEM_TRATAMENTO_INTERVAL.value
)

rename_flow_run = rename_current_flow_run_now_time(
prefix=bilhetagem_transacao_tratamento.name + " ",
Expand All @@ -123,7 +142,7 @@

LABELS = get_current_flow_labels()

# Recapturas
# Recaptura Transação

run_recaptura_trasacao = create_flow_run(
flow_name=bilhetagem_recaptura.name,
Expand All @@ -139,34 +158,35 @@
raise_final_state=True,
)

runs_recaptura_auxiliar = create_flow_run.map(
flow_name=unmapped(bilhetagem_recaptura.name),
# Captura Auxiliar

runs_captura = create_flow_run.map(
flow_name=unmapped(bilhetagem_auxiliar_captura.name),
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value,
labels=unmapped(LABELS),
)

runs_recaptura_auxiliar.set_upstream(wait_recaptura_trasacao)

wait_recaptura_auxiliar = wait_for_flow_run.map(
runs_recaptura_auxiliar,
wait_captura = wait_for_flow_run.map(
runs_captura,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
)

# Captura
runs_captura = create_flow_run.map(
flow_name=unmapped(bilhetagem_auxiliar_captura.name),
# Recaptura Auxiliar

runs_recaptura_auxiliar = create_flow_run.map(
flow_name=unmapped(bilhetagem_recaptura.name),
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value,
labels=unmapped(LABELS),
)

runs_captura.set_upstream(wait_recaptura_auxiliar)
runs_recaptura_auxiliar.set_upstream(wait_captura)

wait_captura = wait_for_flow_run.map(
runs_captura,
wait_recaptura_auxiliar = wait_for_flow_run.map(
runs_recaptura_auxiliar,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
Expand All @@ -193,3 +213,35 @@
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
bilhetagem_transacao_tratamento.schedule = every_hour


with Flow(
"SMTR: Bilhetagem GPS Validador - Tratamento",
code_owners=["caio", "fernanda", "boris", "rodrigo"],
) as bilhetagem_gps_tratamento:
timestamp = get_rounded_timestamp(
interval_minutes=constants.BILHETAGEM_TRATAMENTO_INTERVAL.value
)

rename_flow_run = rename_current_flow_run_now_time(
prefix=bilhetagem_transacao_tratamento.name + " ",
now_time=timestamp,
)

LABELS = get_current_flow_labels()

# Recaptura GPS

run_recaptura_gps = create_flow_run(
flow_name=bilhetagem_recaptura.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
parameters=constants.BILHETAGEM_TRACKING_CAPTURE_PARAMS.value,
)

wait_recaptura_gps = wait_for_flow_run(
run_recaptura_gps,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
49 changes: 41 additions & 8 deletions pipelines/rj_smtr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ class constants(Enum): # pylint: disable=c0103
"engine": "postgresql",
"host": "10.5.115.1",
},
"tracking_db": {
"engine": "postgresql",
"host": "10.5.15.25",
},
},
"source_type": "db",
}
Expand All @@ -203,8 +207,29 @@ class constants(Enum): # pylint: disable=c0103
"interval_minutes": 1,
}

BILHETAGEM_TRACKING_CAPTURE_PARAMS = {
"table_id": "gps_validador",
"partition_date_only": False,
"extract_params": {
"database": "tracking_db",
"query": """
SELECT
*
FROM
tracking_detalhe
WHERE
data_tracking BETWEEN '{start}'
AND '{end}'
""",
},
"primary_key": ["id"],
"interval_minutes": 1,
}

BILHETAGEM_SECRET_PATH = "smtr_jae_access_data"

BILHETAGEM_TRATAMENTO_INTERVAL = 60

BILHETAGEM_CAPTURE_PARAMS = [
{
"table_id": "linha",
Expand All @@ -217,11 +242,13 @@ class constants(Enum): # pylint: disable=c0103
FROM
LINHA
WHERE
DT_INCLUSAO >= '{start}'
DT_INCLUSAO BETWEEN '{start}'
AND '{end}'
""",
},
"primary_key": ["CD_LINHA"], # id column to nest data on
"interval_minutes": 60,
"interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
"truncate_hour": True,
},
{
"table_id": "grupo",
Expand All @@ -234,11 +261,13 @@ class constants(Enum): # pylint: disable=c0103
FROM
GRUPO
WHERE
DT_INCLUSAO >= '{start}'
DT_INCLUSAO BETWEEN '{start}'
AND '{end}'
""",
},
"primary_key": ["CD_GRUPO"], # id column to nest data on
"interval_minutes": 60,
"interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
"truncate_hour": True,
},
{
"table_id": "grupo_linha",
Expand All @@ -251,11 +280,13 @@ class constants(Enum): # pylint: disable=c0103
FROM
GRUPO_LINHA
WHERE
DT_INCLUSAO >= '{start}'
DT_INCLUSAO BETWEEN '{start}'
AND '{end}'
""",
},
"primary_key": ["CD_GRUPO", "CD_LINHA"],
"interval_minutes": 60,
"interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
"truncate_hour": True,
},
{
"table_id": "matriz_integracao",
Expand All @@ -268,14 +299,16 @@ class constants(Enum): # pylint: disable=c0103
FROM
matriz_integracao
WHERE
dt_inclusao >= '{start}'
dt_inclusao BETWEEN '{start}'
AND '{end}'
""",
},
"primary_key": [
"cd_versao_matriz",
"cd_integracao",
], # id column to nest data on
"interval_minutes": 60,
"interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
"truncate_hour": True,
},
]

Expand Down
7 changes: 5 additions & 2 deletions pipelines/rj_smtr/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from pipelines.rj_smtr.tasks import (
create_date_hour_partition,
create_local_partition_path,
get_current_timestamp,
get_rounded_timestamp,
parse_timestamp_to_string,
transform_raw_to_nested_structure,
create_dbt_run_vars,
Expand Down Expand Up @@ -70,16 +70,19 @@
checkpoint=False,
)

current_timestamp = get_rounded_timestamp(interval_minutes=interval_minutes)

with case(recapture, True):
_, recapture_timestamps, recapture_previous_errors = query_logs(
dataset_id=dataset_id,
table_id=table_id,
datetime_filter=current_timestamp,
interval_minutes=interval_minutes,
recapture_window_days=recapture_window_days,
)

with case(recapture, False):
capture_timestamp = [get_current_timestamp()]
capture_timestamp = [current_timestamp]
capture_previous_errors = task(
lambda: [None], checkpoint=False, name="assign_none_to_previous_errors"
)()
Expand Down
39 changes: 38 additions & 1 deletion pipelines/rj_smtr/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,44 @@ def create_dbt_run_vars(
###############


@task
def get_rounded_timestamp(
timestamp: Union[str, datetime, None] = None,
interval_minutes: Union[int, None] = None,
) -> datetime:
"""
Calculate rounded timestamp for flow run.

Args:
timestamp (Union[str, datetime, None]): timestamp to be used as reference
interval_minutes (Union[int, None], optional): interval in minutes between each recapture

Returns:
datetime: timestamp for flow run
"""
if isinstance(timestamp, str):
timestamp = datetime.fromisoformat(timestamp)

if not timestamp:
timestamp = datetime.now(tz=timezone(constants.TIMEZONE.value))

timestamp = timestamp.replace(second=0, microsecond=0)

if interval_minutes:
if interval_minutes >= 60:
hours = interval_minutes / 60
interval_minutes = round(((hours) % 1) * 60)

if interval_minutes == 0:
rounded_minutes = interval_minutes
else:
rounded_minutes = (timestamp.minute // interval_minutes) * interval_minutes

timestamp = timestamp.replace(minute=rounded_minutes)

return timestamp


@task
def get_current_timestamp(timestamp=None, truncate_minute: bool = True) -> datetime:
"""
Expand All @@ -260,7 +298,6 @@ def get_current_timestamp(timestamp=None, truncate_minute: bool = True) -> datet
timestamp = datetime.now(tz=timezone(constants.TIMEZONE.value))
if truncate_minute:
return timestamp.replace(second=0, microsecond=0)
return timestamp


@task
Expand Down
Loading