diff --git a/.github/workflows/pgvector.yml b/.github/workflows/pgvector.yml new file mode 100644 index 000000000..e99bdc3e5 --- /dev/null +++ b/.github/workflows/pgvector.yml @@ -0,0 +1,49 @@ +# This workflow comes from https://github.com/ofek/hatch-mypyc +# https://github.com/ofek/hatch-mypyc/blob/5a198c0ba8660494d02716cfc9d79ce4adfb1442/.github/workflows/test.yml +name: Test / pgvector + +on: + schedule: + - cron: "0 0 * * *" + pull_request: + paths: + - "integrations/pgvector/**" + - ".github/workflows/pgvector.yml" + +concurrency: + group: pgvector-${{ github.head_ref }} + cancel-in-progress: true + +env: + PYTHONUNBUFFERED: "1" + FORCE_COLOR: "1" + +jobs: + run: + name: Python ${{ matrix.python-version }} on ${{ startsWith(matrix.os, 'macos-') && 'macOS' || startsWith(matrix.os, 'windows-') && 'Windows' || 'Linux' }} + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest] + python-version: ["3.9","3.10","3.11"] + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install Hatch + run: pip install --upgrade hatch + + - name: Lint + working-directory: integrations/pgvector + if: matrix.python-version == '3.9' + run: hatch run lint:all + + - name: Run tests + working-directory: integrations/pgvector + run: hatch run cov diff --git a/integrations/pgvector/README.md b/integrations/pgvector/README.md new file mode 100644 index 000000000..e69de29bb diff --git a/integrations/pgvector/pyproject.toml b/integrations/pgvector/pyproject.toml new file mode 100644 index 000000000..266115bfe --- /dev/null +++ b/integrations/pgvector/pyproject.toml @@ -0,0 +1,182 @@ +[build-system] +requires = ["hatchling", "hatch-vcs"] +build-backend = "hatchling.build" + +[project] +name = "pgvector_haystack" +dynamic = ["version"] +description = "" +readme = "README.md" +requires-python = ">=3.7" +license = "Apache-2.0" +keywords = [] +authors = [ + { name = "Siddharth Sahu", email = "siddharth.plaksha@gmail.com" }, +] +classifiers = [ + "Development Status :: 4 - Beta", + "Programming Language :: Python", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", +] +dependencies = [ + "haystack-ai", + "vecs>=0.4.2", +] + +[project.urls] +Documentation = "https://github.com/unknown/example-store#readme" +Issues = "https://github.com/unknown/example-store/issues" +Source = "https://github.com/unknown/example-store" + +[tool.hatch.version] +source = "vcs" +tag-pattern = 'integrations\/pgvector-v(?P.*)' + +[tool.hatch.version.raw-options] +root = "../.." +git_describe_command = 'git describe --tags --match="integrations/pgvector-v[0-9]*"' + +[tool.hatch.envs.default] +dependencies = [ + "coverage[toml]>=6.5", + "pytest", +] +[tool.hatch.envs.default.scripts] +test = "pytest {args:tests}" +test-cov = "coverage run -m pytest {args:tests}" +cov-report = [ + "- coverage combine", + "coverage report", +] +cov = [ + "test-cov", + "cov-report", +] + +[[tool.hatch.envs.all.matrix]] +python = ["3.7", "3.8", "3.9", "3.10", "3.11"] + +[tool.hatch.envs.lint] +detached = true +dependencies = [ + "black>=23.1.0", + "mypy>=1.0.0", + "ruff>=0.0.243", +] +[tool.hatch.envs.lint.scripts] +typing = "mypy --install-types --non-interactive {args:src/example_store tests}" +style = [ + "ruff {args:.}", + "black --check --diff {args:.}", +] +fmt = [ + "black {args:.}", + "ruff --fix {args:.}", + "style", +] +all = [ + "style", + "typing", +] + +[tool.hatch.metadata] +allow-direct-references = true + +[tool.black] +target-version = ["py37"] +line-length = 120 +skip-string-normalization = true + +[tool.ruff] +target-version = "py37" +line-length = 120 +select = [ + "A", + "ARG", + "B", + "C", + "DTZ", + "E", + "EM", + "F", + "FBT", + "I", + "ICN", + "ISC", + "N", + "PLC", + "PLE", + "PLR", + "PLW", + "Q", + "RUF", + "S", + "T", + "TID", + "UP", + "W", + "YTT", +] +ignore = [ + # Allow non-abstract empty methods in abstract base classes + "B027", + # Allow boolean positional values in function calls, like `dict.get(... True)` + "FBT003", + # Ignore checks for possible passwords + "S105", "S106", "S107", + # Ignore complexity + "C901", "PLR0911", "PLR0912", "PLR0913", "PLR0915", +] +unfixable = [ + # Don't touch unused imports + "F401", +] + +[tool.ruff.isort] +known-first-party = ["example_store"] + +[tool.ruff.flake8-tidy-imports] +ban-relative-imports = "all" + +[tool.ruff.per-file-ignores] +# Tests can use magic values, assertions, and relative imports +"tests/**/*" = ["PLR2004", "S101", "TID252"] + +[tool.coverage.run] +source_pkgs = ["example_store", "tests"] +branch = true +parallel = true +omit = [ + "src/example_store/__about__.py", +] + +[tool.coverage.paths] +example_store = ["src/example_store", "*/example-store/src/example_store"] +tests = ["tests", "*/example-store/tests"] + +[tool.coverage.report] +exclude_lines = [ + "no cov", + "if __name__ == .__main__.:", + "if TYPE_CHECKING:", +] + +[tool.pytest.ini_options] +minversion = "6.0" +markers = [ + "unit: unit tests", + "integration: integration tests" +] + +[[tool.mypy.overrides]] +module = [ + "haystack.*", + "pytest.*" +] +ignore_missing_imports = true \ No newline at end of file diff --git a/integrations/pgvector/src/pgvector_haystack/__init__.py b/integrations/pgvector/src/pgvector_haystack/__init__.py new file mode 100644 index 000000000..a63076ca9 --- /dev/null +++ b/integrations/pgvector/src/pgvector_haystack/__init__.py @@ -0,0 +1,6 @@ +# SPDX-FileCopyrightText: 2023-present John Doe +# +# SPDX-License-Identifier: Apache-2.0 +from pgvector_haystack.document_store import PGvectorDocumentStore + +__all__ = ["PGvectorDocumentStore"] diff --git a/integrations/pgvector/src/pgvector_haystack/document_store.py b/integrations/pgvector/src/pgvector_haystack/document_store.py new file mode 100644 index 000000000..17f97febb --- /dev/null +++ b/integrations/pgvector/src/pgvector_haystack/document_store.py @@ -0,0 +1,275 @@ +# SPDX-FileCopyrightText: 2023-present John Doe +# +# SPDX-License-Identifier: Apache-2.0 +import logging +from copy import copy +from typing import Any, Dict, List, Optional + +import numpy as np +import vecs +from haystack.dataclasses import Document +from haystack.document_stores.errors import DuplicateDocumentError, MissingDocumentError +from haystack.document_stores.protocol import DuplicatePolicy + +logger = logging.getLogger(__name__) + + +class PGvectorDocumentStore: + def __init__( + self, + user:str, + password:str, + host:str, + port:str, + db_name:str, + collection_name:str = "documents", + dimension:int = 768, + **collection_creation_kwargs, + ): + """ + Creates a new PGvectorDocumentStore instance. + + For more information on connection parameters, see the official PGvector documentation: https://supabase.github.io/vecs/0.4/ + + :param user: The username for connecting to the PostgreSQL database. + :param password: The password for connecting to the PostgreSQL database. + :param host: The host address of the PostgreSQL database server. + :param port: The port number on which the PostgreSQL database server is listening. + :param db_name: The name of the PostgreSQL database. + :param collection_name: The name of the collection or table in the database where vectors will be stored. + :param dimension: The dimensionality of the vectors to be stored in the document store. + :param **collection_creation_kwargs: Optional arguments that ``PGvector Document Store`` takes. + """ + self._collection_name = collection_name + self._dummy_vector = [0.0]*dimension + self._adapter = collection_creation_kwargs["adapter"] + db_connection = f"postgresql://{user}:{password}@{host}:{port}/{db_name}" + self._pgvector_client = vecs.create_client(db_connection) + self._collection = self._pgvector_client.get_or_create_collection(name=collection_name, dimension=dimension, **collection_creation_kwargs) + + + def count_documents(self) -> int: + """ + Returns how many documents are present in the document store. + """ + return self._collection.__len__() + + + def filter_documents(self, filters: Optional[Dict[str, Any]] = None) -> List[Document]: + """ + Returns the documents that match the filters provided. + + Filters are defined as nested dictionaries that can be of two types: + - Comparison + - Logic + + Comparison dictionaries must contain the keys: + + - `field` + - `operator` + - `value` + + Logic dictionaries must contain the keys: + + - `operator` + - `conditions` + + The `conditions` key must be a list of dictionaries, either of type Comparison or Logic. + + The `operator` value in Comparison dictionaries must be one of: + + - `==` + - `!=` + - `>` + - `>=` + - `<` + - `<=` + - `in` + + The `operator` values in Logic dictionaries must be one of: + + - `OR` + - `AND` + + + A simple filter: + ```python + filters = {"field": "meta.type", "operator": "==", "value": "article"} + ``` + + A more complex filter: + ```python + filters = { + "operator": "AND", + "conditions": [ + {"field": "meta.type", "operator": "==", "value": "article"}, + {"field": "meta.date", "operator": ">=", "value": 1420066800}, + {"field": "meta.date", "operator": "<", "value": 1609455600}, + {"field": "meta.rating", "operator": ">=", "value": 3}, + { + "operator": "OR", + "conditions": [ + {"field": "meta.genre", "operator": "in", "value": ["economy", "politics"]}, + {"field": "meta.publisher", "operator": "==", "value": "nytimes"}, + ], + }, + ], + } + + :param filters: the filters to apply to the document list. + :return: a list of Documents that match the given filters. + """ + if filters and not isinstance(filters, dict): + msg = "Filter must be a dictionary" + raise ValueError(msg) + + filters = self._normalize_filters(filters) + + # pgvector store performs vector similarity search + # here we are querying with a dummy vector and the max compatible top_k + documents = self._embedding_retrieval( + query_embedding=self._dummy_vector, + filters=filters, + ) + + return self._convert_query_result_to_documents(documents) + + + def write_documents(self, documents: List[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int: + """ + Writes (or overwrites) documents into the store. + + :param documents: a list of documents. + :param policy: The duplicate policy to use when writing documents. + PGvectorDocumentStore only supports `DuplicatePolicy.OVERWRITE`. + + :return: None + """ + if policy not in [DuplicatePolicy.NONE, DuplicatePolicy.OVERWRITE]: + logger.warning( + f"PGvectorDocumentStore only supports `DuplicatePolicy.OVERWRITE`" + f"but got {policy}. Overwriting duplicates is enabled by default." + ) + + + for doc in documents: + if not isinstance(doc, Document): + msg = "param 'documents' must contain a list of objects of type Document" + raise ValueError(msg) + if doc.content is None: + logger.warning( + "PGvectorDocumentStore can only store the text field of Documents: " + "'array', 'dataframe' and 'blob' will be dropped." + ) + + if self._adapter is not None: + data = (doc.id, doc.content, {"content":doc.content, **doc.meta}) + self._collection.upsert(records=[data]) + else: + embedding = copy(doc.embedding) + if doc.embedding is None: + logger.warning( + f"Document {doc.id} has no embedding. pgvector is a purely vector database. " + "A dummy embedding will be used, but this can affect the search results. " + ) + embedding = self._dummy_vector + + data = (doc.id, embedding, {"content":doc.content, **doc.meta}) + self._collection.upsert(records=[data]) + + + def delete_documents(self, document_ids: List[str]) -> None: + """ + Deletes all documents with a matching document_ids from the document store. + + :param document_ids: the document ids to delete + """ + self._collection.delete(document_ids) + + + def _convert_query_result_to_documents(self, result) -> List[Document]: + """ + Helper function to convert Chroma results into Haystack Documents + """ + documents = [] + for i in result: + document_dict: Dict[str, Any] = {"id":i[0]} + document_dict["embedding"] = np.array(i[1]) + metadata = i[2] + document_dict["content"] = metadata["content"] + del metadata["content"] + document_dict["meta"] = metadata + documents.append(Document.from_dict(dict)) + + return documents + + + def _embedding_retrieval( + self, + query_embedding: List[float], + *, + filters: Optional[Dict[str, Any]], + top_k: int = 10 + ) -> List[Document]: + """ + Retrieves documents that are most similar to the query embedding using a vector similarity metric. + + :param query_embedding: Embedding of the query. + :param filters: Filters applied to the retrieved Documents. Defaults to None. + :param top_k: Maximum number of Documents to return, defaults to 10 + + :return: List of Document that are most similar to `query_embedding` + """ + if not query_embedding: + msg = "query_embedding must be a non-empty list of floats" + raise ValueError(msg) + + filters = self._normalize_filters(filters) + + results = self._collection.query( + data=query_embedding, + limit=top_k, + filters=filters, + include_value=True, + include_metadata=True + ) + + return self._convert_query_result_to_documents(result=results) + + + def _normalize_filters(self, filters: Dict[str, Any]) -> Dict[str, Any]: + """ + Translate Haystack filters to pgvector filters. It returns a dictionary. + """ + if filters and not isinstance(filters, dict): + msg = "Filter must be a dictionary" + raise ValueError(msg) + + operator_mapping = { + "==": "$eq", + "!=": "$ne", + ">": "$gt", + ">=": "$gte", + "<": "$lt", + "<=": "$lte", + "in": "$in", + "AND": "$and", + "OR": "$or" + } + + def convert(filters: Dict[str, Any]) -> Any: + op = filters.get("operator") + if op not in operator_mapping: + msg = f"{op} not supported in pgvector metadata filtering" + raise ValueError(msg) + + if "conditions" in filters: + # Recursive call for nested conditions + return {operator_mapping[op]: [convert(cond) for cond in filters["conditions"]]} + else: + # Simple statement + field = filters["field"] + value = filters["value"] + return {field: {operator_mapping[op]: value}} + + return convert(filters) diff --git a/integrations/pgvector/src/pgvector_haystack/retriever.py b/integrations/pgvector/src/pgvector_haystack/retriever.py new file mode 100644 index 000000000..dc666ff61 --- /dev/null +++ b/integrations/pgvector/src/pgvector_haystack/retriever.py @@ -0,0 +1,47 @@ +# SPDX-FileCopyrightText: 2023-present John Doe +# +# SPDX-License-Identifier: Apache-2.0 +from typing import Any, Dict, Optional + +from haystack import component + +from pgvector_haystack import PGvectorDocumentStore + + +@component +class PGvectorQueryRetriever: + """ + A component for retrieving documents from an PGvectorDocumentStore. + """ + + def __init__( + self, + *, + document_store: PGvectorDocumentStore, + filters: Optional[Dict[str, Any]] = None, + top_k: int = 10, + ): + """ + Create an PGvectorRetriever component. + + :param document_store: An instance of PGvectorDocumentStore + :param filters: A dictionary with filters to narrow down the search space (default is None). + :param top_k: The maximum number of documents to retrieve default is 10. + + :raises ValueError: If the specified top_k is not > 0. + """ + if top_k <= 0: + msg = "top_k must be greater than zero" + raise ValueError(msg) + self.filters = filters + self.top_k = top_k + self.document_store = document_store + + def run(self, _): + """ + Run the Retriever on the given input data. + + :param data: The input data for the retriever. In this case, a list of queries. + :return: The retrieved documents. + """ + return [] # FIXME diff --git a/integrations/pgvector/tests/__init__.py b/integrations/pgvector/tests/__init__.py new file mode 100644 index 000000000..7eda7517e --- /dev/null +++ b/integrations/pgvector/tests/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: 2023-present John Doe +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/integrations/pgvector/tests/test_document_store.py b/integrations/pgvector/tests/test_document_store.py new file mode 100644 index 000000000..29b548ec5 --- /dev/null +++ b/integrations/pgvector/tests/test_document_store.py @@ -0,0 +1,23 @@ +# SPDX-FileCopyrightText: 2023-present John Doe +# +# SPDX-License-Identifier: Apache-2.0 +from unittest.mock import patch + +import numpy as np +import pytest +from haystack import Document +from haystack.testing.document_store import CountDocumentsTest, DeleteDocumentsTest, WriteDocumentsTest + + +from pgvector_haystack.document_store import PGvectorDocumentStore + + +class TestDocumentStore(CountDocumentsTest, DeleteDocumentsTest, WriteDocumentsTest): + + @pytest.fixture + def docstore(self) -> PGvectorDocumentStore: + """ + This is the most basic requirement for the child class: provide + an instance of this document store so the base class can use it. + """ + return PGvectorDocumentStore() # FIXME