From 4ef85e1211f3f632c795b43cf4b5448d121aa1f7 Mon Sep 17 00:00:00 2001 From: Rafael Pinheiro Date: Tue, 7 Jan 2025 15:04:36 -0300 Subject: [PATCH] adiciona funcoes redis --- pipelines/capture/templates/utils.py | 38 ++++++++++++++++++---------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/pipelines/capture/templates/utils.py b/pipelines/capture/templates/utils.py index cfe2b4d5..42b04cfb 100644 --- a/pipelines/capture/templates/utils.py +++ b/pipelines/capture/templates/utils.py @@ -13,6 +13,7 @@ from pipelines.constants import constants from pipelines.utils.gcp.bigquery import BQTable, Dataset from pipelines.utils.gcp.storage import Storage +from pipelines.utils.prefect import flow_is_running_local from pipelines.utils.utils import convert_timezone, cron_date_range, cron_get_last_date @@ -366,6 +367,11 @@ def __init__( # pylint: disable=R0913 ) self.max_capture_hours = max_capture_hours + def _get_redis_client(self): + if flow_is_running_local(): + return get_redis_client(host="localhost") + return get_redis_client() + def get_last_captured_datetime(self, env: str = None) -> datetime: """ Pega o último datetime materializado no Redis @@ -377,8 +383,8 @@ def get_last_captured_datetime(self, env: str = None) -> datetime: datetime: a data vinda do Redis """ env = env or self.env - redis_key = f"{env}.{self.dataset_id}" - redis_client = get_redis_client() + redis_key = f"{env}.{self.dataset_id}.{self.table_id}" + redis_client = self._get_redis_client() content = redis_client.get(redis_key) last_datetime = ( self.first_timestamp @@ -389,7 +395,9 @@ def get_last_captured_datetime(self, env: str = None) -> datetime: ) ) - return convert_timezone(timestamp=last_datetime) + last_datetime = convert_timezone(timestamp=last_datetime) + + return last_datetime def get_capture_date_range(self, timestamp: datetime) -> dict: date_range_start = self.get_last_captured_datetime() @@ -419,22 +427,24 @@ def set_redis_last_captured_datetime(self, timestamp: datetime): timestamp (datetime): data a ser salva no Redis """ value = timestamp.strftime(constants.MATERIALIZATION_LAST_RUN_PATTERN.value) - redis_key = f"{self.env}.{self.dataset_id}" - log(f"Saving timestamp {value} on key: {redis_key}") - redis_client = get_redis_client() + redis_key = f"{self.env}.{self.dataset_id}.{self.table_id}" + redis_client = self._get_redis_client() content = redis_client.get(redis_key) + log(f"Salvando timestamp {value} na key: {redis_key}") if not content: content = {constants.REDIS_LAST_MATERIALIZATION_TS_KEY.value: value} redis_client.set(redis_key, content) + log("Timestamp salva!") else: - if ( - convert_timezone( - datetime.strptime( - content[constants.REDIS_LAST_MATERIALIZATION_TS_KEY.value], - constants.MATERIALIZATION_LAST_RUN_PATTERN.value, - ) + last_timestamp = convert_timezone( + datetime.strptime( + content[constants.REDIS_LAST_MATERIALIZATION_TS_KEY.value], + constants.MATERIALIZATION_LAST_RUN_PATTERN.value, ) - < timestamp - ): + ) + log(f"Última timestamp salva no Redis: {last_timestamp}") + + if last_timestamp < timestamp: content[constants.REDIS_LAST_MATERIALIZATION_TS_KEY.value] = value redis_client.set(redis_key, content) + log("Timestamp salva!")