Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pathway vectorstore and rag-pathway template #14859

Merged
merged 42 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
51c0b07
Pathway vectorstore and rag-pathway template
lewymati Dec 18, 2023
51944ea
fix imports (#6)
lewymati Dec 19, 2023
3a6a5f0
Berke/fix docs (#7)
berkecanrizai Dec 19, 2023
17e1842
Merge branch 'langchain-ai:master' into master
lewymati Dec 19, 2023
48d3371
revert pyproject/poetry.lock
Dec 19, 2023
3097d7b
optional
Dec 19, 2023
f6f4f25
update pyproject and lockfile
Dec 19, 2023
b6168cb
Merge branch 'master' into master
lewymati Dec 19, 2023
7a6d18c
update poetry.lock hash
Dec 19, 2023
74bff19
Merge branch 'master' into master
lewymati Dec 20, 2023
e0558ed
merge
efriis Dec 21, 2023
8e765a2
Add newline required by Ruff.
janchorowski Dec 21, 2023
0cd643b
fix docstring
Dec 22, 2023
d33df74
cr
hwchase17 Dec 22, 2023
3e41ae9
cr
hwchase17 Dec 22, 2023
7531e5d
scores for pathway vectorstore + self-query retrieval (#8)
lewymati Dec 23, 2023
a8003ec
feat: documentation notebook item ordering
berkecanrizai Dec 26, 2023
2f7e7d4
fix: typo
berkecanrizai Dec 26, 2023
aa1f370
Fix formatting.
janchorowski Dec 27, 2023
8ae6a56
Remove imports from outside langchain_community.
janchorowski Jan 2, 2024
de04c4d
Merge remote-tracking branch 'upstream/master'
janchorowski Jan 2, 2024
45e54f7
ruff formatting
janchorowski Jan 3, 2024
e0ce01a
Merge remote-tracking branch 'upstream/master'
janchorowski Jan 3, 2024
b941e00
Merge remote-tracking branch 'upstream/master'
janchorowski Jan 8, 2024
e090312
Merge remote-tracking branch 'upstream/master'
janchorowski Jan 14, 2024
61cddf2
Merge branch 'master' into janchorowski/master
baskaryan Jan 17, 2024
d4ab492
cr
baskaryan Jan 17, 2024
2dd1078
fix self query test to be immune to different timezones
Jan 18, 2024
ba7a1e6
Merge remote-tracking branch 'upstream/master'
janchorowski Jan 19, 2024
eef269c
Remove self-query
janchorowski Jan 19, 2024
9f10171
Merge remote-tracking branch 'upstream/master'
janchorowski Jan 26, 2024
bc1c4cc
Merge remote-tracking branch 'upstream/master'
janchorowski Jan 31, 2024
6bb6cd8
Add README instructions on running the standalone server
janchorowski Jan 31, 2024
ce5d37f
Remove VectorStoreServer from integration (#10)
szymondudycz Mar 11, 2024
c374daf
Fix docstring
janchorowski Mar 11, 2024
200be9b
Merge branch 'master' into master
janchorowski Mar 11, 2024
c276d26
Merge remote-tracking branch 'upstream/master'
janchorowski Mar 12, 2024
cf7ad98
linters
janchorowski Mar 12, 2024
8bc4cd0
Merge branch 'master' into update_imports
szymondudycz Mar 13, 2024
77acfe1
Merge remote-tracking branch 'upstream/master' into update_imports
janchorowski Mar 29, 2024
f904e54
Ruff lint
janchorowski Mar 29, 2024
196147f
Merge branch 'master' into update_imports
baskaryan Mar 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
297 changes: 297 additions & 0 deletions docs/docs/integrations/vectorstores/pathway.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
" # Pathway\n",
" >[Pathway](https://pathway.com/) is an open data processing framework. It allows you to easily develop data transformation pipelines and Machine Learning applications that work with live data sources and changing data.\n",
"\n",
" This notebook shows how to use the `Pathway` to deploy a live data indexing pipeline which can be queried from your chains just like a regular vector store. However, under the hood, Pathway updates the index on each data change giving you always up-to-date answers.\n",
"\n",
" In this notebook, we will use a simple document processing pipeline that:\n",
" 1. Monitors several data sources (files, S3 folders, cloud storages) for data changes.\n",
" 2. Parses, splits and embeds the documents using `LangChain` methods.\n",
" 3. Builds a vector index for the data.\n",
"\n",
" We will connect to the index using a `VectorStore` client, which implements the `similarity_search` function to retrieve matching documents.\n",
"\n",
" The basic pipeline described in this document allows to effortlessly build a simple vector index of files stored in a cloud location. However, Pathway provides everything needed to build realtime data pipelines and apps, including SQL-like able operations such as groupby-reductions and joins between disparate data sources, time-based grouping and windowing of data, and a wide array of connectors.\n",
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" # Prerequisites\n",
" Install the `pathway` package. To use the most powerful, `unstructured.io` based parser also install unstructured package."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# # !pip install pathway\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" # Building the data pipeline\n",
"\n",
" First, make suer you have an API key ith an LLM provider such as OpenAI."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import getpass\n",
"import os\n",
"\n",
"if \"OPENAI_API_KEY\" not in os.environ:\n",
" os.environ[\"OPENAI_API_KEY\"] = getpass.getpass(\"OpenAI API Key:\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" We will now assemble the data vectorization pipeline, using a simple `UTF8` file parser, `CharacterTextSplitter`, and `OpenAIEmbeddings`.\n",
"\n",
" First, we define the data sources. For simplicity we use the files-based one, but any supported `pathway` [connector](https://pathway.com/developers/api-docs/pathway-io/), such as [s3](https://pathway.com/developers/api-docs/pathway-io-s3/) or [google drive](https://pathway.com/developers/api-docs/pathway-io-gdrive/#pathway.io.gdrive.read) will work too.\n",
"\n",
" Then, we now define the `LangChain` components that define how the data will be processed.\n",
"\n",
" Last, we assemble the data pipeline. We will start it running in a background thread, to be able to query it immediately from the demonstration. Please note that in a production deployment, the server will be running in another process, possibly on another machine. For the quick-start, we keep the server and client as different threads of the same python process."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pathway as pw\n",
"from langchain.embeddings.openai import OpenAIEmbeddings\n",
"from langchain.text_splitter import CharacterTextSplitter\n",
"from langchain.vectorstores import PathwayVectorServer\n",
"\n",
"# This creates a `pathway` connector that reads a single file.\n",
"data_sources = []\n",
"data_sources.append(\n",
" pw.io.fs.read(\n",
" \"../../templates/rag-pathway/sample_documents\",\n",
" format=\"binary\",\n",
" mode=\"streaming\",\n",
" with_metadata=True,\n",
" )\n",
")\n",
"\n",
"# This creates a connector that tracks files in Google drive.\n",
"# please follow the instructions at https://pathway.com/developers/tutorials/connectors/gdrive-connector/ to get credentials\n",
"# data_sources.append(\n",
"# pw.io.gdrive.read(object_id=\"17H4YpBOAKQzEJ93xmC2z170l0bP2npMy\", service_user_credentials_file=\"credentials.json\", with_metadata=True))\n",
"\n",
"# Choose proper LangChain document transformers\n",
"text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)\n",
"embeddings_model = OpenAIEmbeddings(openai_api_key=os.environ[\"OPENAI_API_KEY\"])\n",
"\n",
"# The PathwayVectorServer is a wrapper over pathway.xpacks.llm.vector_store to accept LangChain transforments\n",
"# Fell free to fork it to develop bespoke document processing pipelines.\n",
"vector_server = PathwayVectorServer(\n",
" *data_sources,\n",
" embedder=embeddings_model,\n",
" splitter=text_splitter,\n",
")\n",
"vector_server.run_server(host=\"127.0.0.1\", port=\"8765\", threaded=True, with_cache=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" We now instantiate and configure the client"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from langchain.vectorstores import PathwayVectorClient\n",
"\n",
"client = PathwayVectorClient(\n",
" host=\"127.0.0.1\",\n",
" port=\"8765\",\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" And we can start asking queries"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"query = \"What is Pathway?\"\n",
"docs = client.similarity_search(query)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(docs[0].page_content)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" **Your turn!** Now make a change to the source documents or make a fresh one and retry the query!"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" ## Using `unstructured.io` document parser\n",
"\n",
" The vectorization pipeline supports pluggable parsers. The default parser simply decodes files as UTF8. However, we also provide an https://unstructured.io/ parser that can extract text out of many kinds of documents:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# # !pip install unstructured[all-docs] # if you will need to parse complex documents\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pathway.xpacks.llm import parsers\n",
"\n",
"vector_server = PathwayVectorServer(\n",
" *data_sources,\n",
" parser=parsers.ParseUnstructured(),\n",
" embedder=embeddings_model,\n",
" splitter=text_splitter,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" ## Getting information on indexed files"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" `PathwayVectorClient.get_vectorstore_statistics()` gives essential statistics on the state of the vector store, like the number of indexed files and the timestamp of last updated one. You can use it in your chains to tell the user how fresh is your knowledge base."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client.get_vectorstore_statistics()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" ## Filtering based on file metadata\n",
"\n",
" We support document filtering using [jmespath](https://jmespath.org/) expressions, for instance:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# take into account only sources modified later than unix timestamp\n",
"docs = client.similarity_search(query, metadata_filter=\"modified_at >= `1702672093`\")\n",
"\n",
"# take into account only sources modified later than unix timestamp\n",
"docs = client.similarity_search(query, metadata_filter=\"owner == `james`\")\n",
"\n",
"# take into account only sources with path containing 'repo_readme'\n",
"docs = client.similarity_search(query, metadata_filter=\"contains(path, 'repo_readme')\")\n",
"\n",
"# and of two conditions\n",
"docs = client.similarity_search(\n",
" query, metadata_filter=\"owner == `james` && modified_at >= `1702672093`\"\n",
")\n",
"\n",
"# or of two conditions\n",
"docs = client.similarity_search(\n",
" query, metadata_filter=\"owner == `james` || modified_at >= `1702672093`\"\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" # Running in production\n",
"\n",
" ## Running in a separate process\n",
" A production deployment will typically run the server in a separate process. We provide a template application under [`templates`](https://github.com/langchain-ai/langchain/tree/master/templates/rag-pathway). We recommend to run the Pathway data indexing pipeline in a container-based deployment environment, such as Docker or Kubernetes. For more info see [Pathway's deployment guide](https://pathway.com/developers/user-guide/deployment/).\n",
"\n",
" ## Configuring the cache\n",
" The Pathway vectorizing pipeline comes with an embeddings cache:\n",
" ```python\n",
" vector_server.run_server(..., with_cache=True)\n",
" ```\n",
"\n",
" The default cache configuration is the locally hosted disk cache, stored in the `./Cache` directory. However, it is possible to customize it by explicitly specifying the caching backend that can be chosen among several persistent backends [options](https://pathway.com/developers/api-docs/persistence-api/#pathway.persistence.Backend)."
]
}
],
"metadata": {
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": 3
}
},
"nbformat": 4,
"nbformat_minor": 2
}
18 changes: 18 additions & 0 deletions libs/community/langchain_community/vectorstores/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,18 @@ def _import_opensearch_vector_search() -> Any:
return OpenSearchVectorSearch


def _import_pathway_vector_client() -> Any:
from langchain_community.vectorstores.pathway import PathwayVectorClient

return PathwayVectorClient


def _import_pathway_vector_server() -> Any:
from langchain_community.vectorstores.pathway import PathwayVectorServer

return PathwayVectorServer


def _import_pgembedding() -> Any:
from langchain_community.vectorstores.pgembedding import PGEmbedding

Expand Down Expand Up @@ -525,6 +537,10 @@ def __getattr__(name: str) -> Any:
return _import_neo4j_vector()
elif name == "OpenSearchVectorSearch":
return _import_opensearch_vector_search()
elif name == "PathwayVectorClient":
return _import_pathway_vector_client()
elif name == "PathwayVectorServer":
return _import_pathway_vector_server()
elif name == "PGEmbedding":
return _import_pgembedding()
elif name == "PGVector":
Expand Down Expand Up @@ -624,6 +640,8 @@ def __getattr__(name: str) -> Any:
"MyScaleSettings",
"Neo4jVector",
"OpenSearchVectorSearch",
"PathwayVectorClient",
"PathwayVectorServer",
"PGEmbedding",
"PGVector",
"Pinecone",
Expand Down
Loading
Loading