Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: Qdrant - remove index parameter from methods #1160

Merged
merged 1 commit into from
Oct 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,6 @@ def write_documents(

document_objects = self._handle_duplicate_documents(
documents=documents,
index=self.index,
policy=policy,
)

Expand Down Expand Up @@ -468,7 +467,6 @@ def get_documents_generator(
def get_documents_by_id(
self,
ids: List[str],
index: Optional[str] = None,
) -> List[Document]:
"""
Retrieves documents from Qdrant by their IDs.
Expand All @@ -480,13 +478,11 @@ def get_documents_by_id(
:returns:
A list of documents.
"""
index = index or self.index

documents: List[Document] = []

ids = [convert_id(_id) for _id in ids]
records = self.client.retrieve(
collection_name=index,
collection_name=self.index,
ids=ids,
with_payload=True,
with_vectors=True,
Expand Down Expand Up @@ -987,39 +983,35 @@ def recreate_collection(
def _handle_duplicate_documents(
self,
documents: List[Document],
index: Optional[str] = None,
policy: DuplicatePolicy = None,
):
"""
Checks whether any of the passed documents is already existing in the chosen index and returns a list of
documents that are not in the index yet.

:param documents: A list of Haystack Document objects.
:param index: name of the index
:param policy: The duplicate policy to use when writing documents.
:returns: A list of Haystack Document objects.
"""

index = index or self.index
if policy in (DuplicatePolicy.SKIP, DuplicatePolicy.FAIL):
documents = self._drop_duplicate_documents(documents, index)
documents_found = self.get_documents_by_id(ids=[doc.id for doc in documents], index=index)
documents = self._drop_duplicate_documents(documents)
documents_found = self.get_documents_by_id(ids=[doc.id for doc in documents])
ids_exist_in_db: List[str] = [doc.id for doc in documents_found]

if len(ids_exist_in_db) > 0 and policy == DuplicatePolicy.FAIL:
msg = f"Document with ids '{', '.join(ids_exist_in_db)} already exists in index = '{index}'."
msg = f"Document with ids '{', '.join(ids_exist_in_db)} already exists in index = '{self.index}'."
raise DuplicateDocumentError(msg)

documents = list(filter(lambda doc: doc.id not in ids_exist_in_db, documents))

return documents

def _drop_duplicate_documents(self, documents: List[Document], index: Optional[str] = None) -> List[Document]:
def _drop_duplicate_documents(self, documents: List[Document]) -> List[Document]:
"""
Drop duplicate documents based on same hash ID.

:param documents: A list of Haystack Document objects.
:param index: Name of the index.
:returns: A list of Haystack Document objects.
"""
_hash_ids: Set = set()
Expand All @@ -1030,7 +1022,7 @@ def _drop_duplicate_documents(self, documents: List[Document], index: Optional[s
logger.info(
"Duplicate Documents: Document with id '%s' already exists in index '%s'",
document.id,
index or self.index,
self.index,
)
continue
_documents.append(document)
Expand Down