From b8b42ccbc5323f4489c2e5436d0c571d779d5397 Mon Sep 17 00:00:00 2001 From: Jan Chorowski Date: Fri, 29 Mar 2024 18:50:39 +0100 Subject: [PATCH] community[minor]: Pathway vectorstore(#14859) - **Description:** Integration with pathway.com data processing pipeline acting as an always updated vectorstore - **Issue:** not applicable - **Dependencies:** optional dependency on [`pathway`](https://pypi.org/project/pathway/) - **Twitter handle:** pathway_com The PR provides and integration with `pathway` to provide an easy to use always updated vector store: ```python import pathway as pw from langchain.embeddings.openai import OpenAIEmbeddings from langchain.text_splitter import CharacterTextSplitter from langchain.vectorstores import PathwayVectorClient, PathwayVectorServer data_sources = [] data_sources.append( pw.io.gdrive.read(object_id="17H4YpBOAKQzEJ93xmC2z170l0bP2npMy", service_user_credentials_file="credentials.json", with_metadata=True)) text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0) embeddings_model = OpenAIEmbeddings(openai_api_key=os.environ["OPENAI_API_KEY"]) vector_server = PathwayVectorServer( *data_sources, embedder=embeddings_model, splitter=text_splitter, ) vector_server.run_server(host="127.0.0.1", port="8765", threaded=True, with_cache=False) client = PathwayVectorClient( host="127.0.0.1", port="8765", ) query = "What is Pathway?" docs = client.similarity_search(query) ``` The `PathwayVectorServer` builds a data processing pipeline which continusly scans documents in a given source connector (google drive, s3, ...) and builds a vector store. The `PathwayVectorClient` implements LangChain's `VectorStore` interface and connects to the server to retrieve documents. --------- Co-authored-by: Mateusz Lewandowski Co-authored-by: mlewandowski Co-authored-by: Berke Co-authored-by: Adrian Kosowski Co-authored-by: mlewandowski Co-authored-by: berkecanrizai <63911408+berkecanrizai@users.noreply.github.com> Co-authored-by: Erick Friis Co-authored-by: Harrison Chase Co-authored-by: Bagatur Co-authored-by: mlewandowski Co-authored-by: Szymon Dudycz Co-authored-by: Szymon Dudycz Co-authored-by: Bagatur <22008038+baskaryan@users.noreply.github.com> --- .../integrations/vectorstores/pathway.ipynb | 191 +++++++++++++++ .../vectorstores/__init__.py | 1 + .../vectorstores/pathway.py | 228 ++++++++++++++++++ .../unit_tests/vectorstores/test_imports.py | 1 + .../vectorstores/test_public_api.py | 1 + 5 files changed, 422 insertions(+) create mode 100644 docs/docs/integrations/vectorstores/pathway.ipynb create mode 100644 libs/community/langchain_community/vectorstores/pathway.py diff --git a/docs/docs/integrations/vectorstores/pathway.ipynb b/docs/docs/integrations/vectorstores/pathway.ipynb new file mode 100644 index 0000000000000..9664f0386d2f4 --- /dev/null +++ b/docs/docs/integrations/vectorstores/pathway.ipynb @@ -0,0 +1,191 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Pathway\n", + "> [Pathway](https://pathway.com/) is an open data processing framework. It allows you to easily develop data transformation pipelines and Machine Learning applications that work with live data sources and changing data.\n", + "\n", + "This notebook demonstrates how to use a live `Pathway` data indexing pipeline with `Langchain`. You can query the results of this pipeline from your chains in the same manner as you would a regular vector store. However, under the hood, Pathway updates the index on each data change giving you always up-to-date answers.\n", + "\n", + "In this notebook, we will use a [public demo document processing pipeline](https://pathway.com/solutions/ai-pipelines#try-it-out) that:\n", + "\n", + "1. Monitors several cloud data sources for data changes.\n", + "2. Builds a vector index for the data.\n", + "\n", + "To have your own document processing pipeline check the [hosted offering](https://pathway.com/solutions/ai-pipelines) or [build your own](https://pathway.com/developers/user-guide/llm-xpack/vectorstore_pipeline/).\n", + "\n", + "We will connect to the index using a `VectorStore` client, which implements the `similarity_search` function to retrieve matching documents.\n", + "\n", + "The basic pipeline used in this document allows to effortlessly build a simple vector index of files stored in a cloud location. However, Pathway provides everything needed to build realtime data pipelines and apps, including SQL-like able operations such as groupby-reductions and joins between disparate data sources, time-based grouping and windowing of data, and a wide array of connectors.\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Querying the data pipeline" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To instantiate and configure the client you need to provide either the `url` or the `host` and `port` of your document indexing pipeline. In the code below we use a publicly available [demo pipeline](https://pathway.com/solutions/ai-pipelines#try-it-out), which REST API you can access at `https://demo-document-indexing.pathway.stream`. This demo ingests documents from [Google Drive](https://drive.google.com/drive/u/0/folders/1cULDv2OaViJBmOfG5WB0oWcgayNrGtVs) and [Sharepoint](https://navalgo.sharepoint.com/sites/ConnectorSandbox/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FConnectorSandbox%2FShared%20Documents%2FIndexerSandbox&p=true&ga=1) and maintains an index for retrieving documents." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from langchain_community.vectorstores import PathwayVectorClient\n", + "\n", + "client = PathwayVectorClient(url=\"https://demo-document-indexing.pathway.stream\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + " And we can start asking queries" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "query = \"What is Pathway?\"\n", + "docs = client.similarity_search(query)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(docs[0].page_content)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + " **Your turn!** [Get your pipeline](https://pathway.com/solutions/ai-pipelines) or upload [new documents](https://chat-realtime-sharepoint-gdrive.demo.pathway.com/) to the demo pipeline and retry the query!" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Filtering based on file metadata\n", + "\n", + "We support document filtering using [jmespath](https://jmespath.org/) expressions, for instance:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# take into account only sources modified later than unix timestamp\n", + "docs = client.similarity_search(query, metadata_filter=\"modified_at >= `1702672093`\")\n", + "\n", + "# take into account only sources modified later than unix timestamp\n", + "docs = client.similarity_search(query, metadata_filter=\"owner == `james`\")\n", + "\n", + "# take into account only sources with path containing 'repo_readme'\n", + "docs = client.similarity_search(query, metadata_filter=\"contains(path, 'repo_readme')\")\n", + "\n", + "# and of two conditions\n", + "docs = client.similarity_search(\n", + " query, metadata_filter=\"owner == `james` && modified_at >= `1702672093`\"\n", + ")\n", + "\n", + "# or of two conditions\n", + "docs = client.similarity_search(\n", + " query, metadata_filter=\"owner == `james` || modified_at >= `1702672093`\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Getting information on indexed files" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + " `PathwayVectorClient.get_vectorstore_statistics()` gives essential statistics on the state of the vector store, like the number of indexed files and the timestamp of last updated one. You can use it in your chains to tell the user how fresh is your knowledge base." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "client.get_vectorstore_statistics()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Your own pipeline" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Running in production\n", + "To have your own Pathway data indexing pipeline check the Pathway's offer for [hosted pipelines](https://pathway.com/solutions/ai-pipelines). You can also run your own Pathway pipeline - for information on how to build the pipeline refer to [Pathway guide](https://pathway.com/developers/user-guide/llm-xpack/vectorstore_pipeline/)." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Processing documents\n", + "\n", + "The vectorization pipeline supports pluggable components for parsing, splitting and embedding documents. For embedding and splitting you can use [Langchain components](https://pathway.com/developers/user-guide/llm-xpack/vectorstore_pipeline/#langchain) or check [embedders](https://pathway.com/developers/api-docs/pathway-xpacks-llm/embedders) and [splitters](https://pathway.com/developers/api-docs/pathway-xpacks-llm/splitters) available in Pathway. If parser is not provided, it defaults to `UTF-8` parser. You can find available parsers [here](https://github.com/pathwaycom/pathway/blob/main/python/pathway/xpacks/llm/parser.py)." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.8" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/libs/community/langchain_community/vectorstores/__init__.py b/libs/community/langchain_community/vectorstores/__init__.py index b480a7f8a86c1..c83685d462b35 100644 --- a/libs/community/langchain_community/vectorstores/__init__.py +++ b/libs/community/langchain_community/vectorstores/__init__.py @@ -79,6 +79,7 @@ "Neo4jVector": "langchain_community.vectorstores.neo4j_vector", "NeuralDBVectorStore": "langchain_community.vectorstores.thirdai_neuraldb", "OpenSearchVectorSearch": "langchain_community.vectorstores.opensearch_vector_search", # noqa: E501 + "PathwayVectorClient": "langchain_community.vectorstores.pathway", "PGEmbedding": "langchain_community.vectorstores.pgembedding", "PGVector": "langchain_community.vectorstores.pgvector", "Pinecone": "langchain_community.vectorstores.pinecone", diff --git a/libs/community/langchain_community/vectorstores/pathway.py b/libs/community/langchain_community/vectorstores/pathway.py new file mode 100644 index 0000000000000..cc178935bdc22 --- /dev/null +++ b/libs/community/langchain_community/vectorstores/pathway.py @@ -0,0 +1,228 @@ +""" +Pathway Vector Store client. + + +The Pathway Vector Server is a pipeline written in the Pathway framweork which indexes +all files in a given folder, embeds them, and builds a vector index. The pipeline reacts +to changes in source files, automatically updating appropriate index entries. + +The PathwayVectorClient implements the LangChain VectorStore interface and queries the +PathwayVectorServer to retrieve up-to-date documents. + +You can use the client with managed instances of Pathway Vector Store, or run your own +instance as described at https://pathway.com/developers/user-guide/llm-xpack/vectorstore_pipeline/ + +""" + +import json +import logging +from typing import Any, Callable, Iterable, List, Optional, Tuple + +import requests +from langchain_core.documents import Document +from langchain_core.embeddings import Embeddings +from langchain_core.vectorstores import VectorStore + + +# Copied from https://github.com/pathwaycom/pathway/blob/main/python/pathway/xpacks/llm/vector_store.py +# to remove dependency on Pathway library. +class _VectorStoreClient: + def __init__( + self, + host: Optional[str] = None, + port: Optional[int] = None, + url: Optional[str] = None, + ): + """ + A client you can use to query :py:class:`VectorStoreServer`. + + Please provide aither the `url`, or `host` and `port`. + + Args: + - host: host on which `:py:class:`VectorStoreServer` listens + - port: port on which `:py:class:`VectorStoreServer` listens + - url: url at which `:py:class:`VectorStoreServer` listens + """ + err = "Either (`host` and `port`) or `url` must be provided, but not both." + if url is not None: + if host or port: + raise ValueError(err) + self.url = url + else: + if host is None: + raise ValueError(err) + port = port or 80 + self.url = f"http://{host}:{port}" + + def query( + self, query: str, k: int = 3, metadata_filter: Optional[str] = None + ) -> List[dict]: + """ + Perform a query to the vector store and fetch results. + + Args: + - query: + - k: number of documents to be returned + - metadata_filter: optional string representing the metadata filtering query + in the JMESPath format. The search will happen only for documents + satisfying this filtering. + """ + + data = {"query": query, "k": k} + if metadata_filter is not None: + data["metadata_filter"] = metadata_filter + url = self.url + "/v1/retrieve" + response = requests.post( + url, + data=json.dumps(data), + headers={"Content-Type": "application/json"}, + timeout=3, + ) + responses = response.json() + return sorted(responses, key=lambda x: x["dist"]) + + # Make an alias + __call__ = query + + def get_vectorstore_statistics(self) -> dict: + """Fetch basic statistics about the vector store.""" + + url = self.url + "/v1/statistics" + response = requests.post( + url, + json={}, + headers={"Content-Type": "application/json"}, + ) + responses = response.json() + return responses + + def get_input_files( + self, + metadata_filter: Optional[str] = None, + filepath_globpattern: Optional[str] = None, + ) -> list: + """ + Fetch information on documents in the the vector store. + + Args: + metadata_filter: optional string representing the metadata filtering query + in the JMESPath format. The search will happen only for documents + satisfying this filtering. + filepath_globpattern: optional glob pattern specifying which documents + will be searched for this query. + """ + url = self.url + "/v1/inputs" + response = requests.post( + url, + json={ + "metadata_filter": metadata_filter, + "filepath_globpattern": filepath_globpattern, + }, + headers={"Content-Type": "application/json"}, + ) + responses = response.json() + return responses + + +class PathwayVectorClient(VectorStore): + """ + VectorStore connecting to Pathway Vector Store. + """ + + def __init__( + self, + host: Optional[str] = None, + port: Optional[int] = None, + url: Optional[str] = None, + ) -> None: + """ + A client you can use to query Pathway Vector Store. + + Please provide aither the `url`, or `host` and `port`. + + Args: + - host: host on which Pathway Vector Store listens + - port: port on which Pathway Vector Store listens + - url: url at which Pathway Vector Store listens + """ + self.client = _VectorStoreClient(host, port, url) + + def add_texts( + self, + texts: Iterable[str], + metadatas: Optional[List[dict]] = None, + **kwargs: Any, + ) -> List[str]: + """Pathway is not suitable for this method.""" + raise NotImplementedError( + "Pathway vector store does not support adding or removing texts" + " from client." + ) + + @classmethod + def from_texts( + cls, + texts: List[str], + embedding: Embeddings, + metadatas: Optional[List[dict]] = None, + **kwargs: Any, + ) -> "PathwayVectorClient": + raise NotImplementedError( + "Pathway vector store does not support initializing from_texts." + ) + + def similarity_search( + self, query: str, k: int = 4, **kwargs: Any + ) -> List[Document]: + metadata_filter = kwargs.pop("metadata_filter", None) + if kwargs: + logging.warning( + "Unknown kwargs passed to PathwayVectorClient.similarity_search: %s", + kwargs, + ) + rets = self.client(query=query, k=k, metadata_filter=metadata_filter) + + return [ + Document(page_content=ret["text"], metadata=ret["metadata"]) for ret in rets + ] + + def similarity_search_with_score( + self, + query: str, + k: int = 4, + metadata_filter: Optional[str] = None, + ) -> List[Tuple[Document, float]]: + """Run similarity search with Pathway with distance. + + Args: + - query (str): Query text to search for. + - k (int): Number of results to return. Defaults to 4. + - metadata_filter (Optional[str]): Filter by metadata. + Filtering query should be in JMESPath format. Defaults to None. + + Returns: + List[Tuple[Document, float]]: List of documents most similar to + the query text and cosine distance in float for each. + Lower score represents more similarity. + """ + rets = self.client(query=query, k=k, metadata_filter=metadata_filter) + + return [ + (Document(page_content=ret["text"], metadata=ret["metadata"]), ret["dist"]) + for ret in rets + ] + + def _select_relevance_score_fn(self) -> Callable[[float], float]: + return self._cosine_relevance_score_fn + + def get_vectorstore_statistics(self) -> dict: + """Fetch basic statistics about the Vector Store.""" + return self.client.get_vectorstore_statistics() + + def get_input_files( + self, + metadata_filter: Optional[str] = None, + filepath_globpattern: Optional[str] = None, + ) -> list: + """List files indexed by the Vector Store.""" + return self.client.get_input_files(metadata_filter, filepath_globpattern) diff --git a/libs/community/tests/unit_tests/vectorstores/test_imports.py b/libs/community/tests/unit_tests/vectorstores/test_imports.py index 2b4657acfb280..0a8eb0f8c0cba 100644 --- a/libs/community/tests/unit_tests/vectorstores/test_imports.py +++ b/libs/community/tests/unit_tests/vectorstores/test_imports.py @@ -10,6 +10,7 @@ def test_all_imports() -> None: "AlibabaCloudOpenSearchSettings", "ClickhouseSettings", "MyScaleSettings", + "PathwayVectorClient", "DistanceStrategy", "KineticaSettings", ]: diff --git a/libs/community/tests/unit_tests/vectorstores/test_public_api.py b/libs/community/tests/unit_tests/vectorstores/test_public_api.py index c2007f111363e..474165141365a 100644 --- a/libs/community/tests/unit_tests/vectorstores/test_public_api.py +++ b/libs/community/tests/unit_tests/vectorstores/test_public_api.py @@ -57,6 +57,7 @@ "OpenSearchVectorSearch", "PGEmbedding", "PGVector", + "PathwayVectorClient", "Pinecone", "Qdrant", "Redis",