Skip to content

Commit

Permalink
Pathway vectorstore and rag-pathway template
Browse files Browse the repository at this point in the history
---------

Co-authored-by: mlewandowski <[email protected]>
Co-authored-by: Berke <[email protected]>
Co-authored-by: Jan Chorowski <[email protected]>
Co-authored-by: Adrian Kosowski <[email protected]>
  • Loading branch information
5 people committed Dec 18, 2023
1 parent 4855964 commit 51c0b07
Show file tree
Hide file tree
Showing 17 changed files with 5,102 additions and 4 deletions.
297 changes: 297 additions & 0 deletions docs/docs/integrations/vectorstores/pathway.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
" # Pathway\n",
" >[Pathway](https://pathway.com/) is an open data processing framework. It allows you to easily develop data transformation pipelines and Machine Learning applications that work with live data sources and changing data.\n",
"\n",
" This notebook shows how to use the `Pathway` to deploy a live data indexing pipeline which can be queried from your chains just like a regular vector store. However, under the hood, Pathway updates the index on each data change giving you always up-to-date answers.\n",
"\n",
" In this notebook, we will use a simple document processing pipeline that:\n",
" 1. Monitors several data sources (files, S3 folders, cloud storages) for data changes.\n",
" 2. Parses, splits and embeds the documents using `LangChain` methods.\n",
" 3. Builds a vector index for the data.\n",
"\n",
" We will connect to the index using a `VectorStore` client, which implements the `similarity_search` function to retrieve matching documents.\n",
"\n",
" The basic pipeline described in this document allows to effortlessly build a simple vector index of files stored in a cloud location. However, Pathway provides everything needed to build realtime data pipelines and apps, including SQL-like able operations such as groupby-reductions and joins between disparate data sources, time-based grouping and windowing of data, and a wide array of connectors.\n",
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" # Prerequisites\n",
" Install the `pathway` package. To use the most powerful, `unstructured.io` based parser also install unstructured package."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# # !pip install pathway\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" # Building the data pipeline\n",
"\n",
" First, make suer you have an API key ith an LLM provider such as OpenAI."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import getpass\n",
"import os\n",
"\n",
"if \"OPENAI_API_KEY\" not in os.environ:\n",
" os.environ[\"OPENAI_API_KEY\"] = getpass.getpass(\"OpenAI API Key:\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" We will now assemble the data vectorization pipeline, using a simple `UTF8` file parser, `CharacterTextSplitter`, and `OpenAIEmbeddings`.\n",
"\n",
" First, we define the data sources. For simplicity we use the files-based one, but any supported `pathway` [connector](https://pathway.com/developers/api-docs/pathway-io/), such as [s3](https://pathway.com/developers/api-docs/pathway-io-s3/) or [google drive](https://pathway.com/developers/api-docs/pathway-io-gdrive/#pathway.io.gdrive.read) will work too.\n",
"\n",
" Then, we now define the `LangChain` components that define how the data will be processed.\n",
"\n",
" Last, we assemble the data pipeline. We will start it running in a background thread, to be able to query it immediately from the demonstration. Please note that in a production deployment, the server will be running in another process, possibly on another machine. For the quick-start, we keep the server and client as different threads of the same python process."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pathway as pw\n",
"from langchain.embeddings.openai import OpenAIEmbeddings\n",
"from langchain.text_splitter import CharacterTextSplitter\n",
"from langchain.vectorstores import PathwayVectorServer\n",
"\n",
"# This creates a `pathway` connector that reads a single file.\n",
"data_sources = []\n",
"data_sources.append(\n",
" pw.io.fs.read(\n",
" \"../../templates/rag-pathway/sample_documents\",\n",
" format=\"binary\",\n",
" mode=\"streaming\",\n",
" with_metadata=True,\n",
" )\n",
")\n",
"\n",
"# This creates a connector that tracks files in Google drive.\n",
"# please follow the instructions at https://pathway.com/developers/tutorials/connectors/gdrive-connector/ to get credentials\n",
"# data_sources.append(\n",
"# pw.io.gdrive.read(object_id=\"17H4YpBOAKQzEJ93xmC2z170l0bP2npMy\", service_user_credentials_file=\"credentials.json\", with_metadata=True))\n",
"\n",
"# Choose proper LangChain document transformers\n",
"text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)\n",
"embeddings_model = OpenAIEmbeddings(openai_api_key=os.environ[\"OPENAI_API_KEY\"])\n",
"\n",
"# The PathwayVectorServer is a wrapper over pathway.xpacks.llm.vector_store to accept LangChain transforments\n",
"# Fell free to fork it to develop bespoke document processing pipelines.\n",
"vector_server = PathwayVectorServer(\n",
" *data_sources,\n",
" embedder=embeddings_model,\n",
" splitter=text_splitter,\n",
")\n",
"vector_server.run_server(host=\"127.0.0.1\", port=\"8765\", threaded=True, with_cache=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" We now instantiate and configure the client"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from langchain.vectorstores import PathwayVectorClient\n",
"\n",
"client = PathwayVectorClient(\n",
" host=\"127.0.0.1\",\n",
" port=\"8765\",\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" And we can start asking queries"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"query = \"What is Pathway?\"\n",
"docs = client.similarity_search(query)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(docs[0].page_content)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" **Your turn!** Now make a change to the source documents or make a fresh one and retry the query!"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" ## Using `unstructured.io` document parser\n",
"\n",
" The vectorization pipeline supports pluggable parsers. The default parser simply decodes files as UTF8. However, we also provide an https://unstructured.io/ parser that can extract text out of many kinds of documents:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# # !pip install unstructured[all-docs] # if you will need to parse complex documents\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pathway.xpacks.llm import parsers\n",
"\n",
"vector_server = PathwayVectorServer(\n",
" *data_sources,\n",
" parser=parsers.ParseUnstructured(),\n",
" embedder=embeddings_model,\n",
" splitter=text_splitter,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" ## Getting information on indexed files"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" `PathwayVectorClient.get_vectorstore_statistics()` gives essential statistics on the state of the vector store, like the number of indexed files and the timestamp of last updated one. You can use it in your chains to tell the user how fresh is your knowledge base."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client.get_vectorstore_statistics()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" ## Filtering based on file metadata\n",
"\n",
" We support document filtering using [jmespath](https://jmespath.org/) expressions, for instance:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# take into account only sources modified later than unix timestamp\n",
"docs = client.similarity_search(query, metadata_filter=\"modified_at >= `1702672093`\")\n",
"\n",
"# take into account only sources modified later than unix timestamp\n",
"docs = client.similarity_search(query, metadata_filter=\"owner == `james`\")\n",
"\n",
"# take into account only sources with path containing 'repo_readme'\n",
"docs = client.similarity_search(query, metadata_filter=\"contains(path, 'repo_readme')\")\n",
"\n",
"# and of two conditions\n",
"docs = client.similarity_search(\n",
" query, metadata_filter=\"owner == `james` && modified_at >= `1702672093`\"\n",
")\n",
"\n",
"# or of two conditions\n",
"docs = client.similarity_search(\n",
" query, metadata_filter=\"owner == `james` || modified_at >= `1702672093`\"\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" # Running in production\n",
"\n",
" ## Running in a separate process\n",
" A production deployment will typically run the server in a separate process. We provide a template application under [`templates`](https://github.com/langchain-ai/langchain/tree/master/templates/rag-pathway). We recommend to run the Pathway data indexing pipeline in a container-based deployment environment, such as Docker or Kubernetes. For more info see [Pathway's deployment guide](https://pathway.com/developers/user-guide/deployment/).\n",
"\n",
" ## Configuring the cache\n",
" The Pathway vectorizing pipeline comes with an embeddings cache:\n",
" ```python\n",
" vector_server.run_server(..., with_cache=True)\n",
" ```\n",
"\n",
" The default cache configuration is the locally hosted disk cache, stored in the `./Cache` directory. However, it is possible to customize it by explicitly specifying the caching backend that can be chosen among several persistent backends [options](https://pathway.com/developers/api-docs/persistence-api/#pathway.persistence.Backend)."
]
}
],
"metadata": {
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": 3
}
},
"nbformat": 4,
"nbformat_minor": 2
}
18 changes: 18 additions & 0 deletions libs/community/langchain_community/vectorstores/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,18 @@ def _import_opensearch_vector_search() -> Any:
return OpenSearchVectorSearch


def _import_pathway_vector_client() -> Any:
from langchain_community.vectorstores.pathway import PathwayVectorClient

return PathwayVectorClient


def _import_pathway_vector_server() -> Any:
from langchain_community.vectorstores.pathway import PathwayVectorServer

return PathwayVectorServer


def _import_pgembedding() -> Any:
from langchain_community.vectorstores.pgembedding import PGEmbedding

Expand Down Expand Up @@ -525,6 +537,10 @@ def __getattr__(name: str) -> Any:
return _import_neo4j_vector()
elif name == "OpenSearchVectorSearch":
return _import_opensearch_vector_search()
elif name == "PathwayVectorClient":
return _import_pathway_vector_client()
elif name == "PathwayVectorServer":
return _import_pathway_vector_server()
elif name == "PGEmbedding":
return _import_pgembedding()
elif name == "PGVector":
Expand Down Expand Up @@ -624,6 +640,8 @@ def __getattr__(name: str) -> Any:
"MyScaleSettings",
"Neo4jVector",
"OpenSearchVectorSearch",
"PathwayVectorClient",
"PathwayVectorServer",
"PGEmbedding",
"PGVector",
"Pinecone",
Expand Down
Loading

0 comments on commit 51c0b07

Please sign in to comment.