From 5f28a1b0e4ebb93da6a1682dbc9d8e9575b42382 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Mon, 2 Dec 2024 22:23:23 -0800 Subject: [PATCH] Bugfix/confluence time zone (#3265) * RedisLock typing * checkpoint * put in debug logging * improve comments * mypy fixes --- .../background/celery/apps/app_base.py | 11 ++++--- .../danswer/background/celery/apps/primary.py | 5 +-- backend/danswer/configs/app_configs.py | 16 +++++++++ .../connectors/confluence/connector.py | 11 +++++-- .../connectors/confluence/onyx_confluence.py | 33 +++++++++++++++++++ 5 files changed, 67 insertions(+), 9 deletions(-) diff --git a/backend/danswer/background/celery/apps/app_base.py b/backend/danswer/background/celery/apps/app_base.py index d041ce0d2bc..a92f0c742bd 100644 --- a/backend/danswer/background/celery/apps/app_base.py +++ b/backend/danswer/background/celery/apps/app_base.py @@ -11,6 +11,7 @@ from celery.states import READY_STATES from celery.utils.log import get_task_logger from celery.worker import strategy # type: ignore +from redis.lock import Lock as RedisLock from sentry_sdk.integrations.celery import CeleryIntegration from sqlalchemy import text from sqlalchemy.orm import Session @@ -332,16 +333,16 @@ def on_worker_shutdown(sender: Any, **kwargs: Any) -> None: return logger.info("Releasing primary worker lock.") - lock = sender.primary_worker_lock + lock: RedisLock = sender.primary_worker_lock try: if lock.owned(): try: lock.release() sender.primary_worker_lock = None - except Exception as e: - logger.error(f"Failed to release primary worker lock: {e}") - except Exception as e: - logger.error(f"Failed to check if primary worker lock is owned: {e}") + except Exception: + logger.exception("Failed to release primary worker lock") + except Exception: + logger.exception("Failed to check if primary worker lock is owned") def on_setup_logging( diff --git a/backend/danswer/background/celery/apps/primary.py b/backend/danswer/background/celery/apps/primary.py index 5efe8300670..44080337650 100644 --- a/backend/danswer/background/celery/apps/primary.py +++ b/backend/danswer/background/celery/apps/primary.py @@ -11,6 +11,7 @@ from celery.signals import worker_init from celery.signals import worker_ready from celery.signals import worker_shutdown +from redis.lock import Lock as RedisLock import danswer.background.celery.apps.app_base as app_base from danswer.background.celery.apps.app_base import task_logger @@ -116,7 +117,7 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None: # it is planned to use this lock to enforce singleton behavior on the primary # worker, since the primary worker does redis cleanup on startup, but this isn't # implemented yet. - lock = r.lock( + lock: RedisLock = r.lock( DanswerRedisLocks.PRIMARY_WORKER, timeout=CELERY_PRIMARY_WORKER_LOCK_TIMEOUT, ) @@ -227,7 +228,7 @@ def run_periodic_task(self, worker: Any) -> None: if not hasattr(worker, "primary_worker_lock"): return - lock = worker.primary_worker_lock + lock: RedisLock = worker.primary_worker_lock r = get_redis_client(tenant_id=None) diff --git a/backend/danswer/configs/app_configs.py b/backend/danswer/configs/app_configs.py index 1b630b95300..eda86ecdc79 100644 --- a/backend/danswer/configs/app_configs.py +++ b/backend/danswer/configs/app_configs.py @@ -308,6 +308,22 @@ os.environ.get("CONFLUENCE_CONNECTOR_ATTACHMENT_CHAR_COUNT_THRESHOLD", 200_000) ) +# Due to breakages in the confluence API, the timezone offset must be specified client side +# to match the user's specified timezone. + +# The current state of affairs: +# CQL queries are parsed in the user's timezone and cannot be specified in UTC +# no API retrieves the user's timezone +# All data is returned in UTC, so we can't derive the user's timezone from that + +# https://community.developer.atlassian.com/t/confluence-cloud-time-zone-get-via-rest-api/35954/16 +# https://jira.atlassian.com/browse/CONFCLOUD-69670 + +# enter as a floating point offset from UTC in hours (-24 < val < 24) +# this will be applied globally, so it probably makes sense to transition this to per +# connector as some point. +CONFLUENCE_TIMEZONE_OFFSET = float(os.environ.get("CONFLUENCE_TIMEZONE_OFFSET", 1.0)) + JIRA_CONNECTOR_LABELS_TO_SKIP = [ ignored_tag for ignored_tag in os.environ.get("JIRA_CONNECTOR_LABELS_TO_SKIP", "").split(",") diff --git a/backend/danswer/connectors/confluence/connector.py b/backend/danswer/connectors/confluence/connector.py index e30c85922ce..1869a72b475 100644 --- a/backend/danswer/connectors/confluence/connector.py +++ b/backend/danswer/connectors/confluence/connector.py @@ -1,9 +1,11 @@ from datetime import datetime +from datetime import timedelta from datetime import timezone from typing import Any from urllib.parse import quote from danswer.configs.app_configs import CONFLUENCE_CONNECTOR_LABELS_TO_SKIP +from danswer.configs.app_configs import CONFLUENCE_TIMEZONE_OFFSET from danswer.configs.app_configs import CONTINUE_ON_CONNECTOR_FAILURE from danswer.configs.app_configs import INDEX_BATCH_SIZE from danswer.configs.constants import DocumentSource @@ -69,6 +71,7 @@ def __init__( # skip it. This is generally used to avoid indexing extra sensitive # pages. labels_to_skip: list[str] = CONFLUENCE_CONNECTOR_LABELS_TO_SKIP, + timezone_offset: float = CONFLUENCE_TIMEZONE_OFFSET, ) -> None: self.batch_size = batch_size self.continue_on_failure = continue_on_failure @@ -104,6 +107,8 @@ def __init__( ) self.cql_label_filter = f" and label not in ({comma_separated_labels})" + self.timezone: timezone = timezone(offset=timedelta(hours=timezone_offset)) + @property def confluence_client(self) -> OnyxConfluence: if self._confluence_client is None: @@ -204,12 +209,14 @@ def _fetch_document_batches(self) -> GenerateDocumentsOutput: confluence_page_ids: list[str] = [] page_query = self.cql_page_query + self.cql_label_filter + self.cql_time_filter + logger.debug(f"page_query: {page_query}") # Fetch pages as Documents for page in self.confluence_client.paginated_cql_retrieval( cql=page_query, expand=",".join(_PAGE_EXPANSION_FIELDS), limit=self.batch_size, ): + logger.debug(f"_fetch_document_batches: {page['id']}") confluence_page_ids.append(page["id"]) doc = self._convert_object_to_document(page) if doc is not None: @@ -242,10 +249,10 @@ def load_from_state(self) -> GenerateDocumentsOutput: def poll_source(self, start: float, end: float) -> GenerateDocumentsOutput: # Add time filters - formatted_start_time = datetime.fromtimestamp(start, tz=timezone.utc).strftime( + formatted_start_time = datetime.fromtimestamp(start, tz=self.timezone).strftime( "%Y-%m-%d %H:%M" ) - formatted_end_time = datetime.fromtimestamp(end, tz=timezone.utc).strftime( + formatted_end_time = datetime.fromtimestamp(end, tz=self.timezone).strftime( "%Y-%m-%d %H:%M" ) self.cql_time_filter = f" and lastmodified >= '{formatted_start_time}'" diff --git a/backend/danswer/connectors/confluence/onyx_confluence.py b/backend/danswer/connectors/confluence/onyx_confluence.py index e1542109c42..267c0f9edeb 100644 --- a/backend/danswer/connectors/confluence/onyx_confluence.py +++ b/backend/danswer/connectors/confluence/onyx_confluence.py @@ -134,6 +134,32 @@ def __init__(self, url: str, *args: Any, **kwargs: Any) -> None: super(OnyxConfluence, self).__init__(url, *args, **kwargs) self._wrap_methods() + def get_current_user(self, expand: str | None = None) -> Any: + """ + Implements a method that isn't in the third party client. + + Get information about the current user + :param expand: OPTIONAL expand for get status of user. + Possible param is "status". Results are "Active, Deactivated" + :return: Returns the user details + """ + + from atlassian.errors import ApiPermissionError # type:ignore + + url = "rest/api/user/current" + params = {} + if expand: + params["expand"] = expand + try: + response = self.get(url, params=params) + except HTTPError as e: + if e.response.status_code == 403: + raise ApiPermissionError( + "The calling user does not have permission", reason=e + ) + raise + return response + def _wrap_methods(self) -> None: """ For each attribute that is callable (i.e., a method) and doesn't start with an underscore, @@ -306,6 +332,13 @@ def _validate_connector_configuration( ) spaces = confluence_client_with_minimal_retries.get_all_spaces(limit=1) + # uncomment the following for testing + # the following is an attempt to retrieve the user's timezone + # Unfornately, all data is returned in UTC regardless of the user's time zone + # even tho CQL parses incoming times based on the user's time zone + # space_key = spaces["results"][0]["key"] + # space_details = confluence_client_with_minimal_retries.cql(f"space.key={space_key}+AND+type=space") + if not spaces: raise RuntimeError( f"No spaces found at {wiki_base}! "