You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
It use a function _discard_invalid_meta(document: Document) to filter unsupported meta fields.
So, I use the same solution.
fromtypingimportAny, Dict, ListfromcopyimportdeepcopyimportloggingfromhaystackimportDocumentfromhaystack.document_stores.typesimportDuplicatePolicyfromhaystack.dataclasses.sparse_embeddingimportSparseEmbeddingfrommilvus_haystackimportMilvusDocumentStorelogger=logging.getLogger(__name__)
classMilvusDocumentStore_(MilvusDocumentStore):
defwrite_documents(
self, documents: List[Document], policy: DuplicatePolicy=DuplicatePolicy.NONE
) ->int:
""" Writes documents into the store. :param documents: A list of documents. :param policy: Documents with the same ID count as duplicates. MilvusStore only supports `DuplicatePolicy.NONE` :return: Number of documents written. """frompymilvusimportCollection, MilvusException# only modify here in this functiondocuments_cp= [
MilvusDocumentStore_._discard_invalid_meta(doc)
fordocindeepcopy(documents)
]
iflen(documents_cp) >0andnotisinstance(documents_cp[0], Document):
err_msg= (
"param 'documents' must contain a list of objects of type Document"
)
raiseValueError(err_msg)
ifpolicynotin [DuplicatePolicy.NONE]:
logger.warning(
f"MilvusStore only supports `DuplicatePolicy.NONE`, but got {policy}. ""Milvus does not currently check if entity primary keys are duplicates.""You are responsible for ensuring entity primary keys are unique, ""and if they aren't Milvus may contain multiple entities with duplicate primary keys."
)
# Check embeddingsembedding_dim=128fordocindocuments_cp:
ifdoc.embeddingisnotNone:
embedding_dim=len(doc.embedding)
breakempty_embedding=Falseempty_sparse_embedding=Falsefordocindocuments_cp:
ifdoc.embeddingisNone:
empty_embedding=Truedummy_vector= [self._dummy_value] *embedding_dimdoc.embedding=dummy_vectorifdoc.sparse_embeddingisNone:
empty_sparse_embedding=Truedummy_sparse_vector=SparseEmbedding(
indices=[0],
values=[self._dummy_value],
)
doc.sparse_embedding=dummy_sparse_vectorifdoc.contentisNone:
doc.content=""ifempty_embeddingandself._sparse_vector_fieldisNone:
logger.warning(
"Milvus is a purely vector database, but document has no embedding. ""A dummy embedding will be used, but this can AFFECT THE SEARCH RESULTS!!! ""Please calculate the embedding in each document first, and then write them to Milvus Store."
)
ifempty_sparse_embeddingandself._sparse_vector_fieldisnotNone:
logger.warning(
"You specified `sparse_vector_field`, but document has no sparse embedding. ""A dummy sparse embedding will be used, but this can AFFECT THE SEARCH RESULTS!!! ""Please calculate the sparse embedding in each document first, and then write them to Milvus Store."
)
embeddings= [doc.embeddingfordocindocuments_cp]
sparse_embeddings= [
self._convert_sparse_to_dict(doc.sparse_embedding) fordocindocuments_cp
]
metas= [doc.metafordocindocuments_cp]
texts= [doc.contentfordocindocuments_cp]
ids= [doc.idfordocindocuments_cp]
iflen(embeddings) ==0:
logger.debug("Nothing to insert, skipping.")
return0# If the collection hasn't been initialized yet, perform all steps to do sokwargs: Dict[str, Any] = {}
ifnotisinstance(self.col, Collection):
kwargs= {"embeddings": embeddings, "metas": metas}
ifself.partition_names:
kwargs["partition_names"] =self.partition_namesifself.replica_number:
kwargs["replica_number"] =self.replica_numberifself.timeout:
kwargs["timeout"] =self.timeoutself._init(**kwargs)
# Dict to hold all insert columnsinsert_dict: Dict[str, List] = {
self._text_field: texts,
self._vector_field: embeddings,
self._primary_field: ids,
}
ifself._sparse_vector_field:
insert_dict[self._sparse_vector_field] =sparse_embeddings# Collect the meta into the insert dict.ifmetasisnotNone:
fordinmetas:
forkey, valueind.items():
ifkeyinself.fields:
insert_dict.setdefault(key, []).append(value)
# Total insert countvectors: list=insert_dict[self._vector_field]
total_count=len(vectors)
batch_size=1000wrote_ids= []
ifnotisinstance(self.col, Collection):
raiseMilvusException(message="Collection is not initialized")
foriinrange(0, total_count, batch_size):
# Grab end indexend=min(i+batch_size, total_count)
# Convert dict to list of lists batch for insertioninsert_list= [insert_dict[x][i:end] forxinself.fields]
# Insert into the collection.try:
# res: Collectionres=self.col.insert(insert_list, timeout=None, **kwargs)
wrote_ids.extend(res.primary_keys)
exceptMilvusExceptionaserr:
logger.error(
"Failed to insert batch starting at entity: %s/%s", i, total_count
)
raiseerrreturnlen(wrote_ids)
@staticmethoddef_discard_invalid_meta(document: Document):
""" Remove metadata fields with unsupported types from the document. """frompymilvusimportDataTypefrompymilvus.orm.typesimportinfer_dtype_bydataifdocument.meta:
discarded_keys= []
new_meta= {}
forkey, valueindocument.meta.items():
dtype=infer_dtype_bydata(value)
ifdtypein (DataType.UNKNOWN, DataType.NONE):
discarded_keys.append(key)
else:
new_meta[key] =valueifdiscarded_keys:
msg= (
f"Document {document.id} has metadata fields with unsupported types: {discarded_keys}. "f"Supported types refer to Pymilvus DataType. The values of these fields will be discarded."
)
logger.warning(msg)
document.meta=new_metareturndocument
The text was updated successfully, but these errors were encountered:
When I use
DocumentSplitter(split_by="sentence", split_overlap=50)
and use split_overlap, to write documents into milvus,I get this error,
I found this issues in haystack:
_split_overlap
inmeta
is incompatible with some Document Stores deepset-ai/haystack#8181And this solution is to skip unsupported meta fields of a document.
It use a function
_discard_invalid_meta(document: Document)
to filter unsupported meta fields.So, I use the same solution.
The text was updated successfully, but these errors were encountered: