Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

skip unsupported meta fields #32

Closed
Shuntw6096 opened this issue Sep 6, 2024 · 0 comments · Fixed by #33
Closed

skip unsupported meta fields #32

Shuntw6096 opened this issue Sep 6, 2024 · 0 comments · Fixed by #33
Assignees

Comments

@Shuntw6096
Copy link

When I use DocumentSplitter(split_by="sentence", split_overlap=50) and use split_overlap, to write documents into milvus,
I get this error,

ValueError: Failure to create collection, unrecognized dtype for key: _split_overlap

I found this issues in haystack:

And this solution is to skip unsupported meta fields of a document.

It use a function _discard_invalid_meta(document: Document) to filter unsupported meta fields.
So, I use the same solution.

from typing import Any, Dict, List
from copy import deepcopy
import logging
from haystack import Document
from haystack.document_stores.types import DuplicatePolicy
from haystack.dataclasses.sparse_embedding import SparseEmbedding
from milvus_haystack import MilvusDocumentStore

logger = logging.getLogger(__name__)


class MilvusDocumentStore_(MilvusDocumentStore):

    def write_documents(
        self, documents: List[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE
    ) -> int:
        """
        Writes documents into the store.

        :param documents: A list of documents.
        :param policy: Documents with the same ID count as duplicates.
            MilvusStore only supports `DuplicatePolicy.NONE`
        :return: Number of documents written.
        """

        from pymilvus import Collection, MilvusException

        # only modify here in this function
        documents_cp = [
            MilvusDocumentStore_._discard_invalid_meta(doc)
            for doc in deepcopy(documents)
        ]
        if len(documents_cp) > 0 and not isinstance(documents_cp[0], Document):
            err_msg = (
                "param 'documents' must contain a list of objects of type Document"
            )
            raise ValueError(err_msg)

        if policy not in [DuplicatePolicy.NONE]:
            logger.warning(
                f"MilvusStore only supports `DuplicatePolicy.NONE`, but got {policy}. "
                "Milvus does not currently check if entity primary keys are duplicates."
                "You are responsible for ensuring entity primary keys are unique, "
                "and if they aren't Milvus may contain multiple entities with duplicate primary keys."
            )

        # Check embeddings
        embedding_dim = 128
        for doc in documents_cp:
            if doc.embedding is not None:
                embedding_dim = len(doc.embedding)
                break
        empty_embedding = False
        empty_sparse_embedding = False
        for doc in documents_cp:
            if doc.embedding is None:
                empty_embedding = True
                dummy_vector = [self._dummy_value] * embedding_dim
                doc.embedding = dummy_vector
            if doc.sparse_embedding is None:
                empty_sparse_embedding = True
                dummy_sparse_vector = SparseEmbedding(
                    indices=[0],
                    values=[self._dummy_value],
                )
                doc.sparse_embedding = dummy_sparse_vector
            if doc.content is None:
                doc.content = ""
        if empty_embedding and self._sparse_vector_field is None:
            logger.warning(
                "Milvus is a purely vector database, but document has no embedding. "
                "A dummy embedding will be used, but this can AFFECT THE SEARCH RESULTS!!! "
                "Please calculate the embedding in each document first, and then write them to Milvus Store."
            )
        if empty_sparse_embedding and self._sparse_vector_field is not None:
            logger.warning(
                "You specified `sparse_vector_field`, but document has no sparse embedding. "
                "A dummy sparse embedding will be used, but this can AFFECT THE SEARCH RESULTS!!! "
                "Please calculate the sparse embedding in each document first, and then write them to Milvus Store."
            )

        embeddings = [doc.embedding for doc in documents_cp]
        sparse_embeddings = [
            self._convert_sparse_to_dict(doc.sparse_embedding) for doc in documents_cp
        ]
        metas = [doc.meta for doc in documents_cp]
        texts = [doc.content for doc in documents_cp]
        ids = [doc.id for doc in documents_cp]

        if len(embeddings) == 0:
            logger.debug("Nothing to insert, skipping.")
            return 0

        # If the collection hasn't been initialized yet, perform all steps to do so
        kwargs: Dict[str, Any] = {}
        if not isinstance(self.col, Collection):
            kwargs = {"embeddings": embeddings, "metas": metas}
            if self.partition_names:
                kwargs["partition_names"] = self.partition_names
            if self.replica_number:
                kwargs["replica_number"] = self.replica_number
            if self.timeout:
                kwargs["timeout"] = self.timeout
            self._init(**kwargs)

        # Dict to hold all insert columns
        insert_dict: Dict[str, List] = {
            self._text_field: texts,
            self._vector_field: embeddings,
            self._primary_field: ids,
        }
        if self._sparse_vector_field:
            insert_dict[self._sparse_vector_field] = sparse_embeddings

        # Collect the meta into the insert dict.
        if metas is not None:
            for d in metas:
                for key, value in d.items():
                    if key in self.fields:
                        insert_dict.setdefault(key, []).append(value)

        # Total insert count
        vectors: list = insert_dict[self._vector_field]
        total_count = len(vectors)

        batch_size = 1000
        wrote_ids = []
        if not isinstance(self.col, Collection):
            raise MilvusException(message="Collection is not initialized")
        for i in range(0, total_count, batch_size):
            # Grab end index
            end = min(i + batch_size, total_count)
            # Convert dict to list of lists batch for insertion
            insert_list = [insert_dict[x][i:end] for x in self.fields]
            # Insert into the collection.
            try:
                # res: Collection
                res = self.col.insert(insert_list, timeout=None, **kwargs)
                wrote_ids.extend(res.primary_keys)
            except MilvusException as err:
                logger.error(
                    "Failed to insert batch starting at entity: %s/%s", i, total_count
                )
                raise err
        return len(wrote_ids)

    @staticmethod
    def _discard_invalid_meta(document: Document):
        """
        Remove metadata fields with unsupported types from the document.
        """
        from pymilvus import DataType

        from pymilvus.orm.types import infer_dtype_bydata

        if document.meta:
            discarded_keys = []
            new_meta = {}
            for key, value in document.meta.items():
                dtype = infer_dtype_bydata(value)
                if dtype in (DataType.UNKNOWN, DataType.NONE):
                    discarded_keys.append(key)
                else:
                    new_meta[key] = value

            if discarded_keys:
                msg = (
                    f"Document {document.id} has metadata fields with unsupported types: {discarded_keys}. "
                    f"Supported types refer to Pymilvus DataType. The values of these fields will be discarded."
                )
                logger.warning(msg)
            document.meta = new_meta

        return document
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants