Skip to content

Commit

Permalink
Refactored Confluence Connector (#2859)
Browse files Browse the repository at this point in the history
* Refactored Confluence Connector

* rename metadataconnector to slimconnector

Finish rename

* danswer->onyx

* added rec

* typo

* refactored doc_sync for confluence

* mypy + enable tests

* tested and fixed for confluence cloud

* fixed all server syncing

* fixed connector test

* mypy+connector test fixes

* addressed richards comments

* minor fix
  • Loading branch information
hagen-danswer authored Oct 21, 2024
1 parent c516f35 commit 802086e
Show file tree
Hide file tree
Showing 22 changed files with 754 additions and 1,158 deletions.
34 changes: 19 additions & 15 deletions backend/danswer/background/celery/celery_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
rate_limit_builder,
)
from danswer.connectors.interfaces import BaseConnector
from danswer.connectors.interfaces import IdConnector
from danswer.connectors.interfaces import LoadConnector
from danswer.connectors.interfaces import PollConnector
from danswer.connectors.interfaces import SlimConnector
from danswer.connectors.models import Document
from danswer.db.connector_credential_pair import get_connector_credential_pair
from danswer.db.engine import get_session_with_tenant
Expand Down Expand Up @@ -67,7 +67,9 @@ def get_deletion_attempt_snapshot(
)


def document_batch_to_ids(doc_batch: list[Document]) -> set[str]:
def document_batch_to_ids(
doc_batch: list[Document],
) -> set[str]:
return {doc.id for doc in doc_batch}


Expand All @@ -83,10 +85,13 @@ def extract_ids_from_runnable_connector(
"""
all_connector_doc_ids: set[str] = set()

if isinstance(runnable_connector, SlimConnector):
for metadata_batch in runnable_connector.retrieve_all_slim_documents():
all_connector_doc_ids.update({doc.id for doc in metadata_batch})

doc_batch_generator = None
if isinstance(runnable_connector, IdConnector):
all_connector_doc_ids = runnable_connector.retrieve_all_source_ids()
elif isinstance(runnable_connector, LoadConnector):

if isinstance(runnable_connector, LoadConnector):
doc_batch_generator = runnable_connector.load_from_state()
elif isinstance(runnable_connector, PollConnector):
start = datetime(1970, 1, 1, tzinfo=timezone.utc).timestamp()
Expand All @@ -95,16 +100,15 @@ def extract_ids_from_runnable_connector(
else:
raise RuntimeError("Pruning job could not find a valid runnable_connector.")

if doc_batch_generator:
doc_batch_processing_func = document_batch_to_ids
if MAX_PRUNING_DOCUMENT_RETRIEVAL_PER_MINUTE:
doc_batch_processing_func = rate_limit_builder(
max_calls=MAX_PRUNING_DOCUMENT_RETRIEVAL_PER_MINUTE, period=60
)(document_batch_to_ids)
for doc_batch in doc_batch_generator:
if progress_callback:
progress_callback(len(doc_batch))
all_connector_doc_ids.update(doc_batch_processing_func(doc_batch))
doc_batch_processing_func = document_batch_to_ids
if MAX_PRUNING_DOCUMENT_RETRIEVAL_PER_MINUTE:
doc_batch_processing_func = rate_limit_builder(
max_calls=MAX_PRUNING_DOCUMENT_RETRIEVAL_PER_MINUTE, period=60
)(document_batch_to_ids)
for doc_batch in doc_batch_generator:
if progress_callback:
progress_callback(len(doc_batch))
all_connector_doc_ids.update(doc_batch_processing_func(doc_batch))

return all_connector_doc_ids

Expand Down
2 changes: 1 addition & 1 deletion backend/danswer/background/celery/tasks/pruning/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def redis_increment_callback(amount: int) -> None:
runnable_connector = instantiate_connector(
db_session,
cc_pair.connector.source,
InputType.PRUNE,
InputType.SLIM_RETRIEVAL,
cc_pair.connector.connector_specific_config,
cc_pair.credential,
)
Expand Down
6 changes: 0 additions & 6 deletions backend/danswer/configs/app_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,6 @@
os.environ.get("CONFLUENCE_CONNECTOR_INDEX_ARCHIVED_PAGES", "").lower() == "true"
)

# Save pages labels as Danswer metadata tags
# The reason to skip this would be to reduce the number of calls to Confluence due to rate limit concerns
CONFLUENCE_CONNECTOR_SKIP_LABEL_INDEXING = (
os.environ.get("CONFLUENCE_CONNECTOR_SKIP_LABEL_INDEXING", "").lower() == "true"
)

# Attachments exceeding this size will not be retrieved (in bytes)
CONFLUENCE_CONNECTOR_ATTACHMENT_SIZE_THRESHOLD = int(
os.environ.get("CONFLUENCE_CONNECTOR_ATTACHMENT_SIZE_THRESHOLD", 10 * 1024 * 1024)
Expand Down
4 changes: 2 additions & 2 deletions backend/danswer/connectors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ Connectors come in 3 different flows:
documents via a connector's API or loads the documents from some sort of a dump file.
- Poll connector:
- Incrementally updates documents based on a provided time range. It is used by the background job to pull the latest
changes additions and changes since the last round of polling. This connector helps keep the document index up to date
without needing to fetch/embed/index every document which generally be too slow to do frequently on large sets of
changes and additions since the last round of polling. This connector helps keep the document index up to date
without needing to fetch/embed/index every document which would be too slow to do frequently on large sets of
documents.
- Event Based connectors:
- Connectors that listen to events and update documents accordingly.
Expand Down
32 changes: 0 additions & 32 deletions backend/danswer/connectors/confluence/confluence_utils.py

This file was deleted.

Loading

0 comments on commit 802086e

Please sign in to comment.