Skip to content

Commit

Permalink
remove index param from some methods (#1160)
Browse files Browse the repository at this point in the history
  • Loading branch information
anakin87 authored Oct 29, 2024
1 parent 3220330 commit 6e8ee96
Showing 1 changed file with 6 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,6 @@ def write_documents(

document_objects = self._handle_duplicate_documents(
documents=documents,
index=self.index,
policy=policy,
)

Expand Down Expand Up @@ -468,7 +467,6 @@ def get_documents_generator(
def get_documents_by_id(
self,
ids: List[str],
index: Optional[str] = None,
) -> List[Document]:
"""
Retrieves documents from Qdrant by their IDs.
Expand All @@ -480,13 +478,11 @@ def get_documents_by_id(
:returns:
A list of documents.
"""
index = index or self.index

documents: List[Document] = []

ids = [convert_id(_id) for _id in ids]
records = self.client.retrieve(
collection_name=index,
collection_name=self.index,
ids=ids,
with_payload=True,
with_vectors=True,
Expand Down Expand Up @@ -987,39 +983,35 @@ def recreate_collection(
def _handle_duplicate_documents(
self,
documents: List[Document],
index: Optional[str] = None,
policy: DuplicatePolicy = None,
):
"""
Checks whether any of the passed documents is already existing in the chosen index and returns a list of
documents that are not in the index yet.
:param documents: A list of Haystack Document objects.
:param index: name of the index
:param policy: The duplicate policy to use when writing documents.
:returns: A list of Haystack Document objects.
"""

index = index or self.index
if policy in (DuplicatePolicy.SKIP, DuplicatePolicy.FAIL):
documents = self._drop_duplicate_documents(documents, index)
documents_found = self.get_documents_by_id(ids=[doc.id for doc in documents], index=index)
documents = self._drop_duplicate_documents(documents)
documents_found = self.get_documents_by_id(ids=[doc.id for doc in documents])
ids_exist_in_db: List[str] = [doc.id for doc in documents_found]

if len(ids_exist_in_db) > 0 and policy == DuplicatePolicy.FAIL:
msg = f"Document with ids '{', '.join(ids_exist_in_db)} already exists in index = '{index}'."
msg = f"Document with ids '{', '.join(ids_exist_in_db)} already exists in index = '{self.index}'."
raise DuplicateDocumentError(msg)

documents = list(filter(lambda doc: doc.id not in ids_exist_in_db, documents))

return documents

def _drop_duplicate_documents(self, documents: List[Document], index: Optional[str] = None) -> List[Document]:
def _drop_duplicate_documents(self, documents: List[Document]) -> List[Document]:
"""
Drop duplicate documents based on same hash ID.
:param documents: A list of Haystack Document objects.
:param index: Name of the index.
:returns: A list of Haystack Document objects.
"""
_hash_ids: Set = set()
Expand All @@ -1030,7 +1022,7 @@ def _drop_duplicate_documents(self, documents: List[Document], index: Optional[s
logger.info(
"Duplicate Documents: Document with id '%s' already exists in index '%s'",
document.id,
index or self.index,
self.index,
)
continue
_documents.append(document)
Expand Down

0 comments on commit 6e8ee96

Please sign in to comment.