Skip to content

Commit

Permalink
adiciona funcoes redis
Browse files Browse the repository at this point in the history
  • Loading branch information
pixuimpou committed Jan 7, 2025
1 parent f664a5c commit 4ef85e1
Showing 1 changed file with 24 additions and 14 deletions.
38 changes: 24 additions & 14 deletions pipelines/capture/templates/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from pipelines.constants import constants
from pipelines.utils.gcp.bigquery import BQTable, Dataset
from pipelines.utils.gcp.storage import Storage
from pipelines.utils.prefect import flow_is_running_local
from pipelines.utils.utils import convert_timezone, cron_date_range, cron_get_last_date


Expand Down Expand Up @@ -366,6 +367,11 @@ def __init__( # pylint: disable=R0913
)
self.max_capture_hours = max_capture_hours

def _get_redis_client(self):
if flow_is_running_local():
return get_redis_client(host="localhost")
return get_redis_client()

def get_last_captured_datetime(self, env: str = None) -> datetime:
"""
Pega o último datetime materializado no Redis
Expand All @@ -377,8 +383,8 @@ def get_last_captured_datetime(self, env: str = None) -> datetime:
datetime: a data vinda do Redis
"""
env = env or self.env
redis_key = f"{env}.{self.dataset_id}"
redis_client = get_redis_client()
redis_key = f"{env}.{self.dataset_id}.{self.table_id}"
redis_client = self._get_redis_client()
content = redis_client.get(redis_key)
last_datetime = (
self.first_timestamp
Expand All @@ -389,7 +395,9 @@ def get_last_captured_datetime(self, env: str = None) -> datetime:
)
)

return convert_timezone(timestamp=last_datetime)
last_datetime = convert_timezone(timestamp=last_datetime)

return last_datetime

def get_capture_date_range(self, timestamp: datetime) -> dict:
date_range_start = self.get_last_captured_datetime()
Expand Down Expand Up @@ -419,22 +427,24 @@ def set_redis_last_captured_datetime(self, timestamp: datetime):
timestamp (datetime): data a ser salva no Redis
"""
value = timestamp.strftime(constants.MATERIALIZATION_LAST_RUN_PATTERN.value)
redis_key = f"{self.env}.{self.dataset_id}"
log(f"Saving timestamp {value} on key: {redis_key}")
redis_client = get_redis_client()
redis_key = f"{self.env}.{self.dataset_id}.{self.table_id}"
redis_client = self._get_redis_client()
content = redis_client.get(redis_key)
log(f"Salvando timestamp {value} na key: {redis_key}")
if not content:
content = {constants.REDIS_LAST_MATERIALIZATION_TS_KEY.value: value}
redis_client.set(redis_key, content)
log("Timestamp salva!")
else:
if (
convert_timezone(
datetime.strptime(
content[constants.REDIS_LAST_MATERIALIZATION_TS_KEY.value],
constants.MATERIALIZATION_LAST_RUN_PATTERN.value,
)
last_timestamp = convert_timezone(
datetime.strptime(
content[constants.REDIS_LAST_MATERIALIZATION_TS_KEY.value],
constants.MATERIALIZATION_LAST_RUN_PATTERN.value,
)
< timestamp
):
)
log(f"Última timestamp salva no Redis: {last_timestamp}")

if last_timestamp < timestamp:
content[constants.REDIS_LAST_MATERIALIZATION_TS_KEY.value] = value
redis_client.set(redis_key, content)
log("Timestamp salva!")

0 comments on commit 4ef85e1

Please sign in to comment.