diff --git a/rj_cor/meteorologia/precipitacao_alertario/schedules.html b/rj_cor/meteorologia/precipitacao_alertario/schedules.html index b042150e2..b62e2226d 100644 --- a/rj_cor/meteorologia/precipitacao_alertario/schedules.html +++ b/rj_cor/meteorologia/precipitacao_alertario/schedules.html @@ -43,7 +43,7 @@
pipelines.rj_cor.meteorologia.precipitacao_alerta
minute_schedule = Schedule(
clocks=[
IntervalClock(
- interval=timedelta(minutes=5),
+ interval=timedelta(minutes=3),
start_date=datetime(2021, 1, 1, 0, 1, 0),
labels=[
constants.RJ_COR_AGENT_LABEL.value,
diff --git a/rj_cor/meteorologia/precipitacao_alertario/tasks.html b/rj_cor/meteorologia/precipitacao_alertario/tasks.html
index 90e80d4ec..618971f7a 100644
--- a/rj_cor/meteorologia/precipitacao_alertario/tasks.html
+++ b/rj_cor/meteorologia/precipitacao_alertario/tasks.html
@@ -53,6 +53,7 @@ Module pipelines.rj_cor.meteorologia.precipitacao_alerta
from pipelines.utils.utils import (
build_redis_key,
compare_dates_between_tables_redis,
+ get_redis_output,
log,
to_partitions,
save_str_on_redis,
@@ -230,9 +231,13 @@ Module pipelines.rj_cor.meteorologia.precipitacao_alerta
"""
Save on dbt last timestamp where it was updated
"""
- now = pendulum.now("America/Sao_Paulo").to_datetime_string()
+ last_update_key = build_redis_key(
+ dataset_id, table_id, name="last_update", mode=mode
+ )
+ last_update = get_redis_output(last_update_key)
redis_key = build_redis_key(dataset_id, table_id, name="dbt_last_update", mode=mode)
- save_str_on_redis(redis_key, "date", now)
+ log(f"Saving {last_update} as last time dbt was updated")
+ save_str_on_redis(redis_key, "date", last_update["date"])
@task(skip_on_upstream_skip=False)
@@ -361,9 +366,13 @@ Functions
"""
Save on dbt last timestamp where it was updated
"""
- now = pendulum.now("America/Sao_Paulo").to_datetime_string()
+ last_update_key = build_redis_key(
+ dataset_id, table_id, name="last_update", mode=mode
+ )
+ last_update = get_redis_output(last_update_key)
redis_key = build_redis_key(dataset_id, table_id, name="dbt_last_update", mode=mode)
- save_str_on_redis(redis_key, "date", now)
+ log(f"Saving {last_update} as last time dbt was updated")
+ save_str_on_redis(redis_key, "date", last_update["date"])
diff --git a/utils/utils.html b/utils/utils.html
index 2bf2a947e..b68b432fa 100644
--- a/utils/utils.html
+++ b/utils/utils.html
@@ -991,6 +991,18 @@ Module pipelines.utils.utils
redis_client.hset(redis_key, key, value)
+def get_redis_output(redis_key):
+ """
+ Get Redis output
+ Example: {b'date': b'2023-02-27 07:29:04'}
+ """
+ redis_client = get_redis_client()
+ output = redis_client.hgetall(redis_key)
+ if len(output) > 0:
+ output = treat_redis_output(output)
+ return output
+
+
def treat_redis_output(text):
"""
Redis returns a dict where both key and value are byte string
@@ -1010,23 +1022,20 @@ Module pipelines.utils.utils
table is bigger then the first one
"""
- redis_client = get_redis_client()
-
# get saved date on redis
- date_1 = redis_client.hgetall(key_table_1)
- date_2 = redis_client.hgetall(key_table_2)
+ date_1 = get_redis_output(key_table_1)
+ date_2 = get_redis_output(key_table_2)
+ # Return true if there is no date_1 or date_2 saved on redis
if (len(date_1) == 0) | (len(date_2) == 0):
return True
- date_1 = treat_redis_output(date_1)
- date_2 = treat_redis_output(date_2)
-
# Convert date to pendulum
date_1 = pendulum.from_format(date_1["date"], format_date_table_1)
date_2 = pendulum.from_format(date_2["date"], format_date_table_2)
-
- return date_1 < date_2
+ comparison = date_1 < date_2
+ log(f"Is {date_2} bigger than {date_1}? {comparison}")
+ return comparison
# pylint: disable=W0106
@@ -1232,23 +1241,20 @@ Functions
table is bigger then the first one
"""
- redis_client = get_redis_client()
-
# get saved date on redis
- date_1 = redis_client.hgetall(key_table_1)
- date_2 = redis_client.hgetall(key_table_2)
+ date_1 = get_redis_output(key_table_1)
+ date_2 = get_redis_output(key_table_2)
+ # Return true if there is no date_1 or date_2 saved on redis
if (len(date_1) == 0) | (len(date_2) == 0):
return True
- date_1 = treat_redis_output(date_1)
- date_2 = treat_redis_output(date_2)
-
# Convert date to pendulum
date_1 = pendulum.from_format(date_1["date"], format_date_table_1)
date_2 = pendulum.from_format(date_2["date"], format_date_table_2)
-
- return date_1 < date_2
+ comparison = date_1 < date_2
+ log(f"Is {date_2} bigger than {date_1}? {comparison}")
+ return comparison
@@ -1523,6 +1529,28 @@ Returns
)
+
+def get_redis_output(redis_key)
+
Get Redis output +Example: {b'date': b'2023-02-27 07:29:04'}
def get_redis_output(redis_key):
+ """
+ Get Redis output
+ Example: {b'date': b'2023-02-27 07:29:04'}
+ """
+ redis_client = get_redis_client()
+ output = redis_client.hgetall(redis_key)
+ if len(output) > 0:
+ output = treat_redis_output(output)
+ return output
+
def get_storage_blobs(dataset_id: str, table_id: str, mode: str = 'staging') ‑> list
final_column_treatment
get_credentials_from_env
get_redis_client
get_redis_output
get_storage_blobs
get_username_and_password_from_secret
get_vault_client