diff --git a/backend/danswer/connectors/confluence/connector.py b/backend/danswer/connectors/confluence/connector.py index 9c93f93f99b..ea6a5eecb37 100644 --- a/backend/danswer/connectors/confluence/connector.py +++ b/backend/danswer/connectors/confluence/connector.py @@ -3,6 +3,8 @@ from typing import Any from urllib.parse import quote +from atlassian import Confluence # type: ignore + from danswer.configs.app_configs import CONFLUENCE_CONNECTOR_LABELS_TO_SKIP from danswer.configs.app_configs import CONTINUE_ON_CONNECTOR_FAILURE from danswer.configs.app_configs import INDEX_BATCH_SIZE @@ -70,7 +72,7 @@ def __init__( ) -> None: self.batch_size = batch_size self.continue_on_failure = continue_on_failure - self.confluence_client: OnyxConfluence | None = None + self._confluence_client: OnyxConfluence | None = None self.is_cloud = is_cloud # Remove trailing slash from wiki_base if present @@ -97,39 +99,59 @@ def __init__( self.cql_label_filter = "" if labels_to_skip: labels_to_skip = list(set(labels_to_skip)) - comma_separated_labels = ",".join(f"'{label}'" for label in labels_to_skip) + comma_separated_labels = ",".join( + f"'{quote(label)}'" for label in labels_to_skip + ) self.cql_label_filter = f" and label not in ({comma_separated_labels})" + @property + def confluence_client(self) -> OnyxConfluence: + if self._confluence_client is None: + raise ConnectorMissingCredentialError("Confluence") + return self._confluence_client + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: # see https://github.com/atlassian-api/atlassian-python-api/blob/master/atlassian/rest_client.py # for a list of other hidden constructor args - self.confluence_client = build_confluence_client( + self._confluence_client = build_confluence_client( credentials_json=credentials, is_cloud=self.is_cloud, wiki_base=self.wiki_base, ) + + client_without_retries = Confluence( + api_version="cloud" if self.is_cloud else "latest", + url=self.wiki_base.rstrip("/"), + username=credentials["confluence_username"] if self.is_cloud else None, + password=credentials["confluence_access_token"] if self.is_cloud else None, + token=credentials["confluence_access_token"] if not self.is_cloud else None, + ) + spaces = client_without_retries.get_all_spaces(limit=1) + if not spaces: + raise RuntimeError( + f"No spaces found at {self.wiki_base}! " + "Check your credentials and wiki_base and make sure " + "is_cloud is set correctly." + ) return None def _get_comment_string_for_page_id(self, page_id: str) -> str: - if self.confluence_client is None: - raise ConnectorMissingCredentialError("Confluence") - comment_string = "" comment_cql = f"type=comment and container='{page_id}'" comment_cql += self.cql_label_filter expand = ",".join(_COMMENT_EXPANSION_FIELDS) - for comments in self.confluence_client.paginated_cql_page_retrieval( + for comment in self.confluence_client.paginated_cql_retrieval( cql=comment_cql, expand=expand, ): - for comment in comments: - comment_string += "\nComment:\n" - comment_string += extract_text_from_confluence_html( - confluence_client=self.confluence_client, - confluence_object=comment, - ) + comment_string += "\nComment:\n" + comment_string += extract_text_from_confluence_html( + confluence_client=self.confluence_client, + confluence_object=comment, + fetched_titles=set(), + ) return comment_string @@ -141,28 +163,30 @@ def _convert_object_to_document( If its a page, it extracts the text, adds the comments for the document text. If its an attachment, it just downloads the attachment and converts that into a document. """ - if self.confluence_client is None: - raise ConnectorMissingCredentialError("Confluence") - # The url and the id are the same object_url = build_confluence_document_id( - self.wiki_base, confluence_object["_links"]["webui"] + base_url=self.wiki_base, + content_url=confluence_object["_links"]["webui"], + is_cloud=self.is_cloud, ) object_text = None # Extract text from page if confluence_object["type"] == "page": object_text = extract_text_from_confluence_html( - self.confluence_client, confluence_object + confluence_client=self.confluence_client, + confluence_object=confluence_object, + fetched_titles={confluence_object.get("title", "")}, ) # Add comments to text object_text += self._get_comment_string_for_page_id(confluence_object["id"]) elif confluence_object["type"] == "attachment": object_text = attachment_to_content( - self.confluence_client, confluence_object + confluence_client=self.confluence_client, attachment=confluence_object ) if object_text is None: + # This only happens for attachments that are not parseable return None # Get space name @@ -193,44 +217,39 @@ def _convert_object_to_document( ) def _fetch_document_batches(self) -> GenerateDocumentsOutput: - if self.confluence_client is None: - raise ConnectorMissingCredentialError("Confluence") - doc_batch: list[Document] = [] confluence_page_ids: list[str] = [] page_query = self.cql_page_query + self.cql_label_filter + self.cql_time_filter # Fetch pages as Documents - for page_batch in self.confluence_client.paginated_cql_page_retrieval( + for page in self.confluence_client.paginated_cql_retrieval( cql=page_query, expand=",".join(_PAGE_EXPANSION_FIELDS), limit=self.batch_size, ): - for page in page_batch: - confluence_page_ids.append(page["id"]) - doc = self._convert_object_to_document(page) - if doc is not None: - doc_batch.append(doc) - if len(doc_batch) >= self.batch_size: - yield doc_batch - doc_batch = [] + confluence_page_ids.append(page["id"]) + doc = self._convert_object_to_document(page) + if doc is not None: + doc_batch.append(doc) + if len(doc_batch) >= self.batch_size: + yield doc_batch + doc_batch = [] # Fetch attachments as Documents for confluence_page_id in confluence_page_ids: attachment_cql = f"type=attachment and container='{confluence_page_id}'" attachment_cql += self.cql_label_filter # TODO: maybe should add time filter as well? - for attachments in self.confluence_client.paginated_cql_page_retrieval( + for attachment in self.confluence_client.paginated_cql_retrieval( cql=attachment_cql, expand=",".join(_ATTACHMENT_EXPANSION_FIELDS), ): - for attachment in attachments: - doc = self._convert_object_to_document(attachment) - if doc is not None: - doc_batch.append(doc) - if len(doc_batch) >= self.batch_size: - yield doc_batch - doc_batch = [] + doc = self._convert_object_to_document(attachment) + if doc is not None: + doc_batch.append(doc) + if len(doc_batch) >= self.batch_size: + yield doc_batch + doc_batch = [] if doc_batch: yield doc_batch @@ -255,48 +274,47 @@ def retrieve_all_slim_documents( start: SecondsSinceUnixEpoch | None = None, end: SecondsSinceUnixEpoch | None = None, ) -> GenerateSlimDocumentOutput: - if self.confluence_client is None: - raise ConnectorMissingCredentialError("Confluence") - doc_metadata_list: list[SlimDocument] = [] restrictions_expand = ",".join(_RESTRICTIONS_EXPANSION_FIELDS) page_query = self.cql_page_query + self.cql_label_filter - for pages in self.confluence_client.cql_paginate_all_expansions( + for page in self.confluence_client.cql_paginate_all_expansions( cql=page_query, expand=restrictions_expand, ): - for page in pages: - # If the page has restrictions, add them to the perm_sync_data - # These will be used by doc_sync.py to sync permissions - perm_sync_data = { - "restrictions": page.get("restrictions", {}), - "space_key": page.get("space", {}).get("key"), - } - + # If the page has restrictions, add them to the perm_sync_data + # These will be used by doc_sync.py to sync permissions + perm_sync_data = { + "restrictions": page.get("restrictions", {}), + "space_key": page.get("space", {}).get("key"), + } + + doc_metadata_list.append( + SlimDocument( + id=build_confluence_document_id( + self.wiki_base, + page["_links"]["webui"], + self.is_cloud, + ), + perm_sync_data=perm_sync_data, + ) + ) + attachment_cql = f"type=attachment and container='{page['id']}'" + attachment_cql += self.cql_label_filter + for attachment in self.confluence_client.cql_paginate_all_expansions( + cql=attachment_cql, + expand=restrictions_expand, + ): doc_metadata_list.append( SlimDocument( id=build_confluence_document_id( - self.wiki_base, page["_links"]["webui"] + self.wiki_base, + attachment["_links"]["webui"], + self.is_cloud, ), perm_sync_data=perm_sync_data, ) ) - attachment_cql = f"type=attachment and container='{page['id']}'" - attachment_cql += self.cql_label_filter - for attachments in self.confluence_client.cql_paginate_all_expansions( - cql=attachment_cql, - expand=restrictions_expand, - ): - for attachment in attachments: - doc_metadata_list.append( - SlimDocument( - id=build_confluence_document_id( - self.wiki_base, attachment["_links"]["webui"] - ), - perm_sync_data=perm_sync_data, - ) - ) - yield doc_metadata_list - doc_metadata_list = [] + yield doc_metadata_list + doc_metadata_list = [] diff --git a/backend/danswer/connectors/confluence/onyx_confluence.py b/backend/danswer/connectors/confluence/onyx_confluence.py index c01f45dea6a..eeb7e7158f9 100644 --- a/backend/danswer/connectors/confluence/onyx_confluence.py +++ b/backend/danswer/connectors/confluence/onyx_confluence.py @@ -20,6 +20,10 @@ RATE_LIMIT_MESSAGE_LOWERCASE = "Rate limit exceeded".lower() +# https://jira.atlassian.com/browse/CONFCLOUD-76433 +_PROBLEMATIC_EXPANSIONS = "body.storage.value" +_REPLACEMENT_EXPANSIONS = "body.view.value" + class ConfluenceRateLimitError(Exception): pass @@ -80,7 +84,7 @@ def handle_confluence_rate_limit(confluence_call: F) -> F: def wrapped_call(*args: list[Any], **kwargs: Any) -> Any: MAX_RETRIES = 5 - TIMEOUT = 3600 + TIMEOUT = 600 timeout_at = time.monotonic() + TIMEOUT for attempt in range(MAX_RETRIES): @@ -88,13 +92,16 @@ def wrapped_call(*args: list[Any], **kwargs: Any) -> Any: raise TimeoutError( f"Confluence call attempts took longer than {TIMEOUT} seconds." ) - try: # we're relying more on the client to rate limit itself # and applying our own retries in a more specific set of circumstances return confluence_call(*args, **kwargs) except HTTPError as e: delay_until = _handle_http_error(e, attempt) + logger.warning( + f"HTTPError in confluence call. " + f"Retrying in {delay_until} seconds..." + ) while time.monotonic() < delay_until: # in the future, check a signal here to exit time.sleep(1) @@ -103,7 +110,6 @@ def wrapped_call(*args: list[Any], **kwargs: Any) -> Any: # Users reported it to be intermittent, so just retry if attempt == MAX_RETRIES - 1: raise e - logger.exception( "Confluence Client raised an AttributeError. Retrying..." ) @@ -141,7 +147,7 @@ def _wrap_methods(self) -> None: def _paginate_url( self, url_suffix: str, limit: int | None = None - ) -> Iterator[list[dict[str, Any]]]: + ) -> Iterator[dict[str, Any]]: """ This will paginate through the top level query. """ @@ -153,46 +159,43 @@ def _paginate_url( while url_suffix: try: + logger.debug(f"Making confluence call to {url_suffix}") next_response = self.get(url_suffix) except Exception as e: - logger.exception("Error in danswer_cql: \n") - raise e - yield next_response.get("results", []) - url_suffix = next_response.get("_links", {}).get("next") + logger.warning(f"Error in confluence call to {url_suffix}") - def paginated_groups_retrieval( - self, - limit: int | None = None, - ) -> Iterator[list[dict[str, Any]]]: - return self._paginate_url("rest/api/group", limit) + # If the problematic expansion is in the url, replace it + # with the replacement expansion and try again + # If that fails, raise the error + if _PROBLEMATIC_EXPANSIONS not in url_suffix: + logger.exception(f"Error in confluence call to {url_suffix}") + raise e + logger.warning( + f"Replacing {_PROBLEMATIC_EXPANSIONS} with {_REPLACEMENT_EXPANSIONS}" + " and trying again." + ) + url_suffix = url_suffix.replace( + _PROBLEMATIC_EXPANSIONS, + _REPLACEMENT_EXPANSIONS, + ) + continue - def paginated_group_members_retrieval( - self, - group_name: str, - limit: int | None = None, - ) -> Iterator[list[dict[str, Any]]]: - group_name = quote(group_name) - return self._paginate_url(f"rest/api/group/{group_name}/member", limit) + # yield the results individually + yield from next_response.get("results", []) - def paginated_cql_user_retrieval( - self, - cql: str, - expand: str | None = None, - limit: int | None = None, - ) -> Iterator[list[dict[str, Any]]]: - expand_string = f"&expand={expand}" if expand else "" - return self._paginate_url( - f"rest/api/search/user?cql={cql}{expand_string}", limit - ) + url_suffix = next_response.get("_links", {}).get("next") - def paginated_cql_page_retrieval( + def paginated_cql_retrieval( self, cql: str, expand: str | None = None, limit: int | None = None, - ) -> Iterator[list[dict[str, Any]]]: + ) -> Iterator[dict[str, Any]]: + """ + The content/search endpoint can be used to fetch pages, attachments, and comments. + """ expand_string = f"&expand={expand}" if expand else "" - return self._paginate_url( + yield from self._paginate_url( f"rest/api/content/search?cql={cql}{expand_string}", limit ) @@ -201,7 +204,7 @@ def cql_paginate_all_expansions( cql: str, expand: str | None = None, limit: int | None = None, - ) -> Iterator[list[dict[str, Any]]]: + ) -> Iterator[dict[str, Any]]: """ This function will paginate through the top level query first, then paginate through all of the expansions. @@ -221,6 +224,44 @@ def _traverse_and_update(data: dict | list) -> None: for item in data: _traverse_and_update(item) - for results in self.paginated_cql_page_retrieval(cql, expand, limit): - _traverse_and_update(results) - yield results + for confluence_object in self.paginated_cql_retrieval(cql, expand, limit): + _traverse_and_update(confluence_object) + yield confluence_object + + def paginated_cql_user_retrieval( + self, + cql: str, + expand: str | None = None, + limit: int | None = None, + ) -> Iterator[dict[str, Any]]: + """ + The search/user endpoint can be used to fetch users. + It's a seperate endpoint from the content/search endpoint used only for users. + Otherwise it's very similar to the content/search endpoint. + """ + expand_string = f"&expand={expand}" if expand else "" + yield from self._paginate_url( + f"rest/api/search/user?cql={cql}{expand_string}", limit + ) + + def paginated_groups_retrieval( + self, + limit: int | None = None, + ) -> Iterator[dict[str, Any]]: + """ + This is not an SQL like query. + It's a confluence specific endpoint that can be used to fetch groups. + """ + yield from self._paginate_url("rest/api/group", limit) + + def paginated_group_members_retrieval( + self, + group_name: str, + limit: int | None = None, + ) -> Iterator[dict[str, Any]]: + """ + This is not an SQL like query. + It's a confluence specific endpoint that can be used to fetch the members of a group. + """ + group_name = quote(group_name) + yield from self._paginate_url(f"rest/api/group/{group_name}/member", limit) diff --git a/backend/danswer/connectors/confluence/utils.py b/backend/danswer/connectors/confluence/utils.py index beb0465be60..cb5253f4c14 100644 --- a/backend/danswer/connectors/confluence/utils.py +++ b/backend/danswer/connectors/confluence/utils.py @@ -2,6 +2,7 @@ from datetime import datetime from datetime import timezone from typing import Any +from urllib.parse import quote import bs4 @@ -71,7 +72,9 @@ def _get_user(confluence_client: OnyxConfluence, user_id: str) -> str: def extract_text_from_confluence_html( - confluence_client: OnyxConfluence, confluence_object: dict[str, Any] + confluence_client: OnyxConfluence, + confluence_object: dict[str, Any], + fetched_titles: set[str], ) -> str: """Parse a Confluence html page and replace the 'user Id' by the real User Display Name @@ -79,7 +82,7 @@ def extract_text_from_confluence_html( Args: confluence_object (dict): The confluence object as a dict confluence_client (Confluence): Confluence client - + fetched_titles (set[str]): The titles of the pages that have already been fetched Returns: str: loaded and formated Confluence page """ @@ -100,6 +103,73 @@ def extract_text_from_confluence_html( continue # Include @ sign for tagging, more clear for LLM user.replaceWith("@" + _get_user(confluence_client, user_id)) + + for html_page_reference in soup.findAll("ac:structured-macro"): + # Here, we only want to process page within page macros + if html_page_reference.attrs.get("ac:name") != "include": + continue + + page_data = html_page_reference.find("ri:page") + if not page_data: + logger.warning( + f"Skipping retrieval of {html_page_reference} because because page data is missing" + ) + continue + + page_title = page_data.attrs.get("ri:content-title") + if not page_title: + # only fetch pages that have a title + logger.warning( + f"Skipping retrieval of {html_page_reference} because it has no title" + ) + continue + + if page_title in fetched_titles: + # prevent recursive fetching of pages + logger.debug(f"Skipping {page_title} because it has already been fetched") + continue + + fetched_titles.add(page_title) + + # Wrap this in a try-except because there are some pages that might not exist + try: + page_query = f"type=page and title='{quote(page_title)}'" + + page_contents: dict[str, Any] | None = None + # Confluence enforces title uniqueness, so we should only get one result here + for page in confluence_client.paginated_cql_retrieval( + cql=page_query, + expand="body.storage.value", + limit=1, + ): + page_contents = page + break + except Exception as e: + logger.warning( + f"Error getting page contents for object {confluence_object}: {e}" + ) + continue + + if not page_contents: + continue + + text_from_page = extract_text_from_confluence_html( + confluence_client=confluence_client, + confluence_object=page_contents, + fetched_titles=fetched_titles, + ) + + html_page_reference.replaceWith(text_from_page) + + for html_link_body in soup.findAll("ac:link-body"): + # This extracts the text from inline links in the page so they can be + # represented in the document text as plain text + try: + text_from_link = html_link_body.text + html_link_body.replaceWith(f"(LINK TEXT: {text_from_link})") + except Exception as e: + logger.warning(f"Error processing ac:link-body: {e}") + return format_document_soup(soup) @@ -153,7 +223,9 @@ def attachment_to_content( return extracted_text -def build_confluence_document_id(base_url: str, content_url: str) -> str: +def build_confluence_document_id( + base_url: str, content_url: str, is_cloud: bool +) -> str: """For confluence, the document id is the page url for a page based document or the attachment download url for an attachment based document @@ -164,6 +236,8 @@ def build_confluence_document_id(base_url: str, content_url: str) -> str: Returns: str: The document id """ + if is_cloud and not base_url.endswith("/wiki"): + base_url += "/wiki" return f"{base_url}{content_url}" @@ -209,6 +283,6 @@ def build_confluence_client( password=credentials_json["confluence_access_token"] if is_cloud else None, token=credentials_json["confluence_access_token"] if not is_cloud else None, backoff_and_retry=True, - max_backoff_retries=60, + max_backoff_retries=10, max_backoff_seconds=60, ) diff --git a/backend/ee/danswer/external_permissions/confluence/group_sync.py b/backend/ee/danswer/external_permissions/confluence/group_sync.py index a55bb777bc5..dd372265819 100644 --- a/backend/ee/danswer/external_permissions/confluence/group_sync.py +++ b/backend/ee/danswer/external_permissions/confluence/group_sync.py @@ -1,5 +1,4 @@ -from typing import Any - +from atlassian import Confluence # type: ignore from sqlalchemy.orm import Session from danswer.connectors.confluence.onyx_confluence import OnyxConfluence @@ -19,12 +18,8 @@ def _get_group_members_email_paginated( confluence_client: OnyxConfluence, group_name: str, ) -> set[str]: - members: list[dict[str, Any]] = [] - for member_batch in confluence_client.paginated_group_members_retrieval(group_name): - members.extend(member_batch) - group_member_emails: set[str] = set() - for member in members: + for member in confluence_client.paginated_group_members_retrieval(group_name): email = member.get("email") if not email: user_name = member.get("username") @@ -43,19 +38,33 @@ def confluence_group_sync( db_session: Session, cc_pair: ConnectorCredentialPair, ) -> None: + credentials = cc_pair.credential.credential_json is_cloud = cc_pair.connector.connector_specific_config.get("is_cloud", False) + wiki_base = cc_pair.connector.connector_specific_config["wiki_base"] + + # test connection with direct client, no retries + confluence_client = Confluence( + api_version="cloud" if is_cloud else "latest", + url=wiki_base.rstrip("/"), + username=credentials["confluence_username"] if is_cloud else None, + password=credentials["confluence_access_token"] if is_cloud else None, + token=credentials["confluence_access_token"] if not is_cloud else None, + ) + spaces = confluence_client.get_all_spaces(limit=1) + if not spaces: + raise RuntimeError(f"No spaces found at {wiki_base}!") + confluence_client = build_confluence_client( - credentials_json=cc_pair.credential.credential_json, + credentials_json=credentials, is_cloud=is_cloud, - wiki_base=cc_pair.connector.connector_specific_config["wiki_base"], + wiki_base=wiki_base, ) # Get all group names group_names: list[str] = [] - for group_batch in confluence_client.paginated_groups_retrieval(): - for group in group_batch: - if group_name := group.get("name"): - group_names.append(group_name) + for group in confluence_client.paginated_groups_retrieval(): + if group_name := group.get("name"): + group_names.append(group_name) # For each group name, get all members and create a danswer group danswer_groups: list[ExternalUserGroup] = []