-
Notifications
You must be signed in to change notification settings - Fork 15.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
community[minor]: Pathway vectorstore(#14859)
- **Description:** Integration with pathway.com data processing pipeline acting as an always updated vectorstore - **Issue:** not applicable - **Dependencies:** optional dependency on [`pathway`](https://pypi.org/project/pathway/) - **Twitter handle:** pathway_com The PR provides and integration with `pathway` to provide an easy to use always updated vector store: ```python import pathway as pw from langchain.embeddings.openai import OpenAIEmbeddings from langchain.text_splitter import CharacterTextSplitter from langchain.vectorstores import PathwayVectorClient, PathwayVectorServer data_sources = [] data_sources.append( pw.io.gdrive.read(object_id="17H4YpBOAKQzEJ93xmC2z170l0bP2npMy", service_user_credentials_file="credentials.json", with_metadata=True)) text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0) embeddings_model = OpenAIEmbeddings(openai_api_key=os.environ["OPENAI_API_KEY"]) vector_server = PathwayVectorServer( *data_sources, embedder=embeddings_model, splitter=text_splitter, ) vector_server.run_server(host="127.0.0.1", port="8765", threaded=True, with_cache=False) client = PathwayVectorClient( host="127.0.0.1", port="8765", ) query = "What is Pathway?" docs = client.similarity_search(query) ``` The `PathwayVectorServer` builds a data processing pipeline which continusly scans documents in a given source connector (google drive, s3, ...) and builds a vector store. The `PathwayVectorClient` implements LangChain's `VectorStore` interface and connects to the server to retrieve documents. --------- Co-authored-by: Mateusz Lewandowski <[email protected]> Co-authored-by: mlewandowski <[email protected]> Co-authored-by: Berke <[email protected]> Co-authored-by: Adrian Kosowski <[email protected]> Co-authored-by: mlewandowski <[email protected]> Co-authored-by: berkecanrizai <[email protected]> Co-authored-by: Erick Friis <[email protected]> Co-authored-by: Harrison Chase <[email protected]> Co-authored-by: Bagatur <[email protected]> Co-authored-by: mlewandowski <[email protected]> Co-authored-by: Szymon Dudycz <[email protected]> Co-authored-by: Szymon Dudycz <[email protected]> Co-authored-by: Bagatur <[email protected]>
- Loading branch information
Showing
5 changed files
with
422 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,191 @@ | ||
{ | ||
"cells": [ | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"# Pathway\n", | ||
"> [Pathway](https://pathway.com/) is an open data processing framework. It allows you to easily develop data transformation pipelines and Machine Learning applications that work with live data sources and changing data.\n", | ||
"\n", | ||
"This notebook demonstrates how to use a live `Pathway` data indexing pipeline with `Langchain`. You can query the results of this pipeline from your chains in the same manner as you would a regular vector store. However, under the hood, Pathway updates the index on each data change giving you always up-to-date answers.\n", | ||
"\n", | ||
"In this notebook, we will use a [public demo document processing pipeline](https://pathway.com/solutions/ai-pipelines#try-it-out) that:\n", | ||
"\n", | ||
"1. Monitors several cloud data sources for data changes.\n", | ||
"2. Builds a vector index for the data.\n", | ||
"\n", | ||
"To have your own document processing pipeline check the [hosted offering](https://pathway.com/solutions/ai-pipelines) or [build your own](https://pathway.com/developers/user-guide/llm-xpack/vectorstore_pipeline/).\n", | ||
"\n", | ||
"We will connect to the index using a `VectorStore` client, which implements the `similarity_search` function to retrieve matching documents.\n", | ||
"\n", | ||
"The basic pipeline used in this document allows to effortlessly build a simple vector index of files stored in a cloud location. However, Pathway provides everything needed to build realtime data pipelines and apps, including SQL-like able operations such as groupby-reductions and joins between disparate data sources, time-based grouping and windowing of data, and a wide array of connectors.\n" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"## Querying the data pipeline" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"To instantiate and configure the client you need to provide either the `url` or the `host` and `port` of your document indexing pipeline. In the code below we use a publicly available [demo pipeline](https://pathway.com/solutions/ai-pipelines#try-it-out), which REST API you can access at `https://demo-document-indexing.pathway.stream`. This demo ingests documents from [Google Drive](https://drive.google.com/drive/u/0/folders/1cULDv2OaViJBmOfG5WB0oWcgayNrGtVs) and [Sharepoint](https://navalgo.sharepoint.com/sites/ConnectorSandbox/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FConnectorSandbox%2FShared%20Documents%2FIndexerSandbox&p=true&ga=1) and maintains an index for retrieving documents." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"from langchain_community.vectorstores import PathwayVectorClient\n", | ||
"\n", | ||
"client = PathwayVectorClient(url=\"https://demo-document-indexing.pathway.stream\")" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
" And we can start asking queries" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"query = \"What is Pathway?\"\n", | ||
"docs = client.similarity_search(query)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"print(docs[0].page_content)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
" **Your turn!** [Get your pipeline](https://pathway.com/solutions/ai-pipelines) or upload [new documents](https://chat-realtime-sharepoint-gdrive.demo.pathway.com/) to the demo pipeline and retry the query!" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"## Filtering based on file metadata\n", | ||
"\n", | ||
"We support document filtering using [jmespath](https://jmespath.org/) expressions, for instance:" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"# take into account only sources modified later than unix timestamp\n", | ||
"docs = client.similarity_search(query, metadata_filter=\"modified_at >= `1702672093`\")\n", | ||
"\n", | ||
"# take into account only sources modified later than unix timestamp\n", | ||
"docs = client.similarity_search(query, metadata_filter=\"owner == `james`\")\n", | ||
"\n", | ||
"# take into account only sources with path containing 'repo_readme'\n", | ||
"docs = client.similarity_search(query, metadata_filter=\"contains(path, 'repo_readme')\")\n", | ||
"\n", | ||
"# and of two conditions\n", | ||
"docs = client.similarity_search(\n", | ||
" query, metadata_filter=\"owner == `james` && modified_at >= `1702672093`\"\n", | ||
")\n", | ||
"\n", | ||
"# or of two conditions\n", | ||
"docs = client.similarity_search(\n", | ||
" query, metadata_filter=\"owner == `james` || modified_at >= `1702672093`\"\n", | ||
")" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"## Getting information on indexed files" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
" `PathwayVectorClient.get_vectorstore_statistics()` gives essential statistics on the state of the vector store, like the number of indexed files and the timestamp of last updated one. You can use it in your chains to tell the user how fresh is your knowledge base." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"client.get_vectorstore_statistics()" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"## Your own pipeline" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"### Running in production\n", | ||
"To have your own Pathway data indexing pipeline check the Pathway's offer for [hosted pipelines](https://pathway.com/solutions/ai-pipelines). You can also run your own Pathway pipeline - for information on how to build the pipeline refer to [Pathway guide](https://pathway.com/developers/user-guide/llm-xpack/vectorstore_pipeline/)." | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"### Processing documents\n", | ||
"\n", | ||
"The vectorization pipeline supports pluggable components for parsing, splitting and embedding documents. For embedding and splitting you can use [Langchain components](https://pathway.com/developers/user-guide/llm-xpack/vectorstore_pipeline/#langchain) or check [embedders](https://pathway.com/developers/api-docs/pathway-xpacks-llm/embedders) and [splitters](https://pathway.com/developers/api-docs/pathway-xpacks-llm/splitters) available in Pathway. If parser is not provided, it defaults to `UTF-8` parser. You can find available parsers [here](https://github.com/pathwaycom/pathway/blob/main/python/pathway/xpacks/llm/parser.py)." | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [] | ||
} | ||
], | ||
"metadata": { | ||
"kernelspec": { | ||
"display_name": "Python 3 (ipykernel)", | ||
"language": "python", | ||
"name": "python3" | ||
}, | ||
"language_info": { | ||
"codemirror_mode": { | ||
"name": "ipython", | ||
"version": 3 | ||
}, | ||
"file_extension": ".py", | ||
"mimetype": "text/x-python", | ||
"name": "python", | ||
"nbconvert_exporter": "python", | ||
"pygments_lexer": "ipython3", | ||
"version": "3.11.8" | ||
} | ||
}, | ||
"nbformat": 4, | ||
"nbformat_minor": 4 | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.