diff --git a/docs/docs/integrations/platforms/google.mdx b/docs/docs/integrations/platforms/google.mdx index 350ff563baed8..8caa8c6d61081 100644 --- a/docs/docs/integrations/platforms/google.mdx +++ b/docs/docs/integrations/platforms/google.mdx @@ -202,6 +202,28 @@ See a [usage example](/docs/integrations/vectorstores/matchingengine). from langchain.vectorstores import MatchingEngine ``` +### Google BigQuery Vector Search + +> [Google BigQuery](https://cloud.google.com/bigquery), +> BigQuery is a serverless and cost-effective enterprise data warehouse in Google Cloud. +> +> Google BigQuery Vector Search +> BigQuery vector search lets you use GoogleSQL to do semantic search, using vector indexes for fast but approximate results, or using brute force for exact results. + +> It can calculate Euclidean or Cosine distance. With LangChain, we default to use Euclidean distance. + +We need to install several python packages. + +```bash +pip install google-cloud-bigquery +``` + +See a [usage example](/docs/integrations/vectorstores/bigquery_vector_search). + +```python +from langchain.vectorstores import BigQueryVectorSearch +``` + ### Google ScaNN >[Google ScaNN](https://github.com/google-research/google-research/tree/master/scann) diff --git a/docs/docs/integrations/vectorstores/bigquery_vector_search.ipynb b/docs/docs/integrations/vectorstores/bigquery_vector_search.ipynb new file mode 100644 index 0000000000000..236d23529a6d2 --- /dev/null +++ b/docs/docs/integrations/vectorstores/bigquery_vector_search.ipynb @@ -0,0 +1,353 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "id": "E_RJy7C1bpCT" + }, + "source": [ + "# BigQuery Vector Search\n", + "> **BigQueryVectorSearch**:\n", + "BigQuery vector search lets you use GoogleSQL to do semantic search, using vector indexes for fast approximate results, or using brute force for exact results.\n", + "\n", + "\n", + "This tutorial illustrates how to work with an end-to-end data and embedding management system in LangChain, and provide scalable semantic search in BigQuery." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "EmPJkpOCckyh" + }, + "source": [ + "## Getting started\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "IR54BmgvdHT_" + }, + "source": [ + "### Install the library" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 1000 + }, + "id": "0ZITIDE160OD", + "outputId": "e184bc0d-6541-4e0a-82d2-1e216db00a2d" + }, + "outputs": [], + "source": [ + "! pip install langchain google-cloud-aiplatform google-cloud-bigquery --upgrade --user" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "v40bB_GMcr9f" + }, + "source": [ + "**Colab only:** Uncomment the following cell to restart the kernel or use the button to restart the kernel. For Vertex AI Workbench you can restart the terminal using the button on top." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "6o0iGVIdDD6K" + }, + "outputs": [], + "source": [ + "# # Automatically restart kernel after installs so that your environment can access the new packages\n", + "# import IPython\n", + "\n", + "# app = IPython.Application.instance()\n", + "# app.kernel.do_shutdown(True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Before you begin" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Set your project ID\n", + "\n", + "If you don't know your project ID, try the following:\n", + "* Run `gcloud config list`.\n", + "* Run `gcloud projects list`.\n", + "* See the support page: [Locate the project ID](https://support.google.com/googleapi/answer/7014113)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# @title Project { display-mode: \"form\" }\n", + "PROJECT_ID = \"\" # @param {type:\"string\"}\n", + "\n", + "# Set the project id\n", + "! gcloud config set project {PROJECT_ID}" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Set the region\n", + "\n", + "You can also change the `REGION` variable used by BigQuery. Learn more about [BigQuery regions](https://cloud.google.com/bigquery/docs/locations#supported_locations)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# @title Region { display-mode: \"form\" }\n", + "REGION = \"US\" # @param {type: \"string\"}" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Set the dataset and table names\n", + "\n", + "They will be your BigQuery Vector Store." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# @title Dataset and Table { display-mode: \"form\" }\n", + "DATASET = \"my_langchain_dataset\" # @param {type: \"string\"}\n", + "TABLE = \"doc_and_vectors\" # @param {type: \"string\"}" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Authenticating your notebook environment\n", + "\n", + "- If you are using **Colab** to run this notebook, uncomment the cell below and continue.\n", + "- If you are using **Vertex AI Workbench**, check out the setup instructions [here](https://github.com/GoogleCloudPlatform/generative-ai/tree/main/setup-env)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from google.colab import auth as google_auth\n", + "\n", + "google_auth.authenticate_user()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "AD3yG49BdLlr" + }, + "source": [ + "## Demo: BigQueryVectorSearch" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create an embedding class instance\n", + "\n", + "You may need to enable Vertex AI API in your project by running\n", + "`gcloud services enable aiplatform.googleapis.com --project {PROJECT_ID}`\n", + "(replace `{PROJECT_ID}` with the name of your project).\n", + "\n", + "You can use any [LangChain embeddings model](https://python.langchain.com/docs/integrations/text_embedding/)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "Vb2RJocV9_LQ", + "outputId": "37f5dc74-2512-47b2-c135-f34c10afdcf4" + }, + "outputs": [], + "source": [ + "from langchain_community.embeddings import VertexAIEmbeddings\n", + "\n", + "embedding = VertexAIEmbeddings(\n", + " model_name=\"textembedding-gecko@latest\", project=PROJECT_ID\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create BigQuery Dataset\n", + "\n", + "Optional step to create the dataset if it doesn't exist." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from google.cloud import bigquery\n", + "\n", + "client = bigquery.Client(project=PROJECT_ID, location=REGION)\n", + "client.create_dataset(dataset=DATASET, exists_ok=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Initialize BigQueryVectorSearch Vector Store with an existing BigQuery dataset" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from langchain.vectorstores.utils import DistanceStrategy\n", + "from langchain_community.vectorstores import BigQueryVectorSearch\n", + "\n", + "store = BigQueryVectorSearch(\n", + " project_id=PROJECT_ID,\n", + " dataset_name=DATASET,\n", + " table_name=TABLE,\n", + " location=REGION,\n", + " embedding=embedding,\n", + " distance_strategy=DistanceStrategy.EUCLIDEAN_DISTANCE,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Add texts" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "all_texts = [\"Apples and oranges\", \"Cars and airplanes\", \"Pineapple\", \"Train\", \"Banana\"]\n", + "metadatas = [{\"len\": len(t)} for t in all_texts]\n", + "\n", + "store.add_texts(all_texts, metadatas=metadatas)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Search for documents" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "query = \"I'd like a fruit.\"\n", + "docs = store.similarity_search(query)\n", + "print(docs)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Search for documents by vector" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "query_vector = embedding.embed_query(query)\n", + "docs = store.similarity_search_by_vector(query_vector, k=2)\n", + "print(docs)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Search for documents with metadata filter" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# This should only return \"Banana\" document.\n", + "docs = store.similarity_search_by_vector(query_vector, filter={\"len\": 6})\n", + "print(docs)" + ] + } + ], + "metadata": { + "colab": { + "provenance": [], + "toc_visible": true + }, + "kernelspec": { + "display_name": "Python 3", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.0" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/libs/community/langchain_community/vectorstores/__init__.py b/libs/community/langchain_community/vectorstores/__init__.py index 6d490362fa2d6..0ddaafed85703 100644 --- a/libs/community/langchain_community/vectorstores/__init__.py +++ b/libs/community/langchain_community/vectorstores/__init__.py @@ -104,6 +104,14 @@ def _import_baiducloud_vector_search() -> Any: return BESVectorStore +def _import_bigquery() -> Any: + from langchain_community.vectorstores.bigquery_vector_search import ( + BigQueryVectorSearch, + ) + + return BigQueryVectorSearch + + def _import_cassandra() -> Any: from langchain_community.vectorstores.cassandra import Cassandra @@ -473,6 +481,8 @@ def __getattr__(name: str) -> Any: return _import_azuresearch() elif name == "Bagel": return _import_bageldb() + elif name == "BigQueryVectorSearch": + return _import_bigquery() elif name == "BESVectorStore": return _import_baiducloud_vector_search() elif name == "Cassandra": diff --git a/libs/community/langchain_community/vectorstores/bigquery_vector_search.py b/libs/community/langchain_community/vectorstores/bigquery_vector_search.py new file mode 100644 index 0000000000000..d132e7e071a73 --- /dev/null +++ b/libs/community/langchain_community/vectorstores/bigquery_vector_search.py @@ -0,0 +1,835 @@ +"""Vector Store in Google Cloud BigQuery.""" +from __future__ import annotations + +import asyncio +import json +import logging +import sys +import uuid +from datetime import datetime +from functools import partial +from threading import Lock, Thread +from typing import Any, Callable, Dict, List, Optional, Tuple, Type + +import numpy as np +from langchain_core.documents import Document +from langchain_core.embeddings import Embeddings +from langchain_core.vectorstores import VectorStore + +from langchain_community.vectorstores.utils import ( + DistanceStrategy, + maximal_marginal_relevance, +) + +DEFAULT_DISTANCE_STRATEGY = DistanceStrategy.EUCLIDEAN_DISTANCE +DEFAULT_DOC_ID_COLUMN_NAME = "doc_id" # document id +DEFAULT_TEXT_EMBEDDING_COLUMN_NAME = "text_embedding" # embeddings vectors +DEFAULT_METADATA_COLUMN_NAME = "metadata" # document metadata +DEFAULT_CONTENT_COLUMN_NAME = "content" # text content, do not rename +DEFAULT_TOP_K = 4 # default number of documents returned from similarity search + +_INDEX_CHECK_PERIOD_SECONDS = 60 # Do not check for index more often that this. + +_vector_table_lock = Lock() # process-wide BigQueryVectorSearch table lock + + +class BigQueryVectorSearch(VectorStore): + """Google Cloud BigQuery vector store. + + To use, you need the following packages installed: + google-cloud-bigquery + """ + + def __init__( + self, + embedding: Embeddings, + project_id: str, + dataset_name: str, + table_name: str, + location: str = "US", + content_field: str = DEFAULT_CONTENT_COLUMN_NAME, + metadata_field: str = DEFAULT_METADATA_COLUMN_NAME, + text_embedding_field: str = DEFAULT_TEXT_EMBEDDING_COLUMN_NAME, + doc_id_field: str = DEFAULT_DOC_ID_COLUMN_NAME, + distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, + credentials: Optional[Any] = None, + ): + """Constructor for BigQueryVectorSearch. + + Args: + embedding (Embeddings): Text Embedding model to use. + project_id (str): GCP project. + dataset_name (str): BigQuery dataset to store documents and embeddings. + table_name (str): BigQuery table name. + location (str, optional): BigQuery region. Defaults to + `US`(multi-region). + content_field (str): Specifies the column to store the content. + Defaults to `content`. + metadata_field (str): Specifies the column to store the metadata. + Defaults to `metadata`. + text_embedding_field (str): Specifies the column to store + the embeddings vector. + Defaults to `text_embedding`. + doc_id_field (str): Specifies the column to store the document id. + Defaults to `doc_id`. + distance_strategy (DistanceStrategy, optional): + Determines the strategy employed for calculating + the distance between vectors in the embedding space. + Defaults to EUCLIDEAN_DISTANCE. + Available options are: + - COSINE: Measures the similarity between two vectors of an inner + product space. + - EUCLIDEAN_DISTANCE: Computes the Euclidean distance between + two vectors. This metric considers the geometric distance in + the vector space, and might be more suitable for embeddings + that rely on spatial relationships. This is the default behavior + credentials (Credentials, optional): Custom Google Cloud credentials + to use. Defaults to None. + """ + try: + from google.cloud import bigquery + + self.bq_client = bigquery.Client( + project=project_id, location=location, credentials=credentials + ) + except ModuleNotFoundError: + raise ImportError( + "Please, install or upgrade the google-cloud-bigquery library: " + "pip install google-cloud-bigquery" + ) + self._logger = logging.getLogger(__name__) + self._creating_index = False + self._have_index = False + self.embedding_model = embedding + self.project_id = project_id + self.dataset_name = dataset_name + self.table_name = table_name + self.location = location + self.content_field = content_field + self.metadata_field = metadata_field + self.text_embedding_field = text_embedding_field + self.doc_id_field = doc_id_field + self.distance_strategy = distance_strategy + self._full_table_id = ( + f"{self.project_id}." f"{self.dataset_name}." f"{self.table_name}" + ) + self._logger.debug("Using table `%s`", self.full_table_id) + with _vector_table_lock: + self.vectors_table = self._initialize_table() + self._last_index_check = datetime.min + self._initialize_vector_index() + + def _initialize_table(self) -> Any: + """Validates or creates the BigQuery table.""" + from google.cloud import bigquery + + table_ref = bigquery.TableReference.from_string(self._full_table_id) + table = self.bq_client.create_table(table_ref, exists_ok=True) + changed_schema = False + schema = table.schema.copy() + columns = {c.name: c for c in schema} + if self.doc_id_field not in columns: + changed_schema = True + schema.append( + bigquery.SchemaField(name=self.doc_id_field, field_type="STRING") + ) + elif ( + columns[self.doc_id_field].field_type != "STRING" + or columns[self.doc_id_field].mode == "REPEATED" + ): + raise ValueError(f"Column {self.doc_id_field} must be of " "STRING type") + if self.metadata_field not in columns: + changed_schema = True + schema.append( + bigquery.SchemaField(name=self.metadata_field, field_type="JSON") + ) + elif ( + columns[self.metadata_field].field_type not in ["JSON", "STRING"] + or columns[self.metadata_field].mode == "REPEATED" + ): + raise ValueError( + f"Column {self.metadata_field} must be of STRING or JSON type" + ) + if self.content_field not in columns: + changed_schema = True + schema.append( + bigquery.SchemaField(name=self.content_field, field_type="STRING") + ) + elif ( + columns[self.content_field].field_type != "STRING" + or columns[self.content_field].mode == "REPEATED" + ): + raise ValueError(f"Column {self.content_field} must be of " "STRING type") + if self.text_embedding_field not in columns: + changed_schema = True + schema.append( + bigquery.SchemaField( + name=self.text_embedding_field, + field_type="FLOAT64", + mode="REPEATED", + ) + ) + elif ( + columns[self.text_embedding_field].field_type not in ("FLOAT", "FLOAT64") + or columns[self.text_embedding_field].mode != "REPEATED" + ): + raise ValueError( + f"Column {self.text_embedding_field} must be of " "ARRAY type" + ) + if changed_schema: + self._logger.debug("Updated table `%s` schema.", self.full_table_id) + table.schema = schema + table = self.bq_client.update_table(table, fields=["schema"]) + return table + + def _initialize_vector_index(self) -> Any: + """ + A vector index in BigQuery table enables efficient + approximate vector search. + """ + from google.cloud import bigquery + + if self._have_index or self._creating_index: + # Already have an index or in the process of creating one. + return + if ( + datetime.utcnow() - self._last_index_check + ).total_seconds() < _INDEX_CHECK_PERIOD_SECONDS: + return + with _vector_table_lock: + if self._creating_index or self._have_index: + return + self._last_index_check = datetime.utcnow() + # Check if index exists, create if necessary + check_query = ( + f"SELECT 1 FROM `{self.project_id}.{self.dataset_name}" + ".INFORMATION_SCHEMA.VECTOR_INDEXES` WHERE" + f" table_name = '{self.table_name}'" + ) + job = self.bq_client.query( + check_query, api_method=bigquery.enums.QueryApiMethod.QUERY + ) + if job.result().total_rows == 0: + # Need to create an index. Make it in a separate thread. + self._create_index_in_background() + else: + self._logger.debug("Vector index already exists.") + self._have_index = True + + def _create_index_in_background(self): + if self._have_index or self._creating_index: + # Already have an index or in the process of creating one. + return + self._creating_index = True + self._logger.debug("Trying to create a vector index.") + thread = Thread(target=self._create_index, daemon=True) + thread.start() + + def _create_index(self): + from google.api_core.exceptions import ClientError + + if self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE: + distance_type = "EUCLIDEAN" + elif self.distance_strategy == DistanceStrategy.COSINE: + distance_type = "COSINE" + # Default to EUCLIDEAN_DISTANCE + else: + distance_type = "EUCLIDEAN" + index_name = f"{self.table_name}_langchain_index" + try: + sql = f""" + CREATE VECTOR INDEX IF NOT EXISTS + `{index_name}` + ON `{self.full_table_id}`({self.text_embedding_field}) + OPTIONS(distance_type="{distance_type}", index_type="IVF") + """ + self.bq_client.query(sql).result() + self._have_index = True + except ClientError as ex: + self._logger.debug("Vector index creation failed (%s).", ex.args[0]) + finally: + self._creating_index = False + + def _persist(self, data: Dict[str, Any]) -> None: + """Saves documents and embeddings to BigQuery.""" + from google.cloud import bigquery + + data_len = len(data[list(data.keys())[0]]) + if data_len == 0: + return + + list_of_dicts = [dict(zip(data, t)) for t in zip(*data.values())] + + job_config = bigquery.LoadJobConfig() + job_config.schema = self.vectors_table.schema + job_config.schema_update_options = ( + bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION + ) + job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND + job = self.bq_client.load_table_from_json( + list_of_dicts, self.vectors_table, job_config=job_config + ) + job.result() + + @property + def embeddings(self) -> Optional[Embeddings]: + return self.embedding_model + + @property + def full_table_id(self) -> str: + return self._full_table_id + + def add_texts( + self, + texts: List[str], + metadatas: Optional[List[dict]] = None, + **kwargs: Any, + ) -> List[str]: + """Run more texts through the embeddings and add to the vectorstore. + + Args: + texts: List of strings to add to the vectorstore. + metadatas: Optional list of metadata associated with the texts. + + Returns: + List of ids from adding the texts into the vectorstore. + """ + embs = self.embedding_model.embed_documents(texts) + return self.add_texts_with_embeddings(texts, embs, metadatas, **kwargs) + + def add_texts_with_embeddings( + self, + texts: List[str], + embs: List[List[float]], + metadatas: Optional[List[dict]] = None, + **kwargs: Any, + ) -> List[str]: + """Run more texts through the embeddings and add to the vectorstore. + + Args: + texts: List of strings to add to the vectorstore. + embs: List of lists of floats with text embeddings for texts. + metadatas: Optional list of metadata associated with the texts. + + Returns: + List of ids from adding the texts into the vectorstore. + """ + ids = [uuid.uuid4().hex for _ in texts] + values_dict: Dict[str, List[Any]] = { + self.content_field: texts, + self.doc_id_field: ids, + } + if not metadatas: + metadatas = [] + len_diff = len(ids) - len(metadatas) + add_meta = [None for _ in range(0, len_diff)] + metadatas = [m if m is not None else {} for m in metadatas + add_meta] + values_dict[self.metadata_field] = metadatas + values_dict[self.text_embedding_field] = embs + self._persist(values_dict) + return ids + + def get_documents( + self, ids: Optional[List[str]] = None, filter: Optional[Dict[str, Any]] = None + ) -> List[Document]: + """Search documents by their ids or metadata values. + + Args: + ids: List of ids of documents to retrieve from the vectorstore. + filter: Filter on metadata properties, e.g. + { + "str_property": "foo", + "int_property": 123 + } + Returns: + List of ids from adding the texts into the vectorstore. + """ + if ids and len(ids) > 0: + from google.cloud import bigquery + + job_config = bigquery.QueryJobConfig( + query_parameters=[ + bigquery.ArrayQueryParameter("ids", "STRING", ids), + ] + ) + id_expr = f"{self.doc_id_field} IN UNNEST(@ids)" + else: + job_config = None + id_expr = "TRUE" + if filter: + filter_expressions = [] + for i in filter.items(): + if isinstance(i[1], float): + expr = ( + "ABS(CAST(JSON_VALUE(" + f"`{self.metadata_field}`,'$.{i[0]}') " + f"AS FLOAT64) - {i[1]}) " + f"<= {sys.float_info.epsilon}" + ) + else: + val = str(i[1]).replace('"', '\\"') + expr = ( + f"JSON_VALUE(`{self.metadata_field}`,'$.{i[0]}')" f' = "{val}"' + ) + filter_expressions.append(expr) + filter_expression_str = " AND ".join(filter_expressions) + where_filter_expr = f" AND ({filter_expression_str})" + else: + where_filter_expr = "" + + job = self.bq_client.query( + f""" + SELECT * FROM `{self.full_table_id}` WHERE {id_expr} + {where_filter_expr} + """, + job_config=job_config, + ) + docs: List[Document] = [] + for row in job: + metadata = None + if self.metadata_field: + metadata = row[self.metadata_field] + if metadata: + metadata = json.loads(metadata) + else: + metadata = {} + metadata["__id"] = row[self.doc_id_field] + doc = Document(page_content=row[self.content_field], metadata=metadata) + docs.append(doc) + return docs + + def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]: + """Delete by vector ID or other criteria. + + Args: + ids: List of ids to delete. + **kwargs: Other keyword arguments that subclasses might use. + + Returns: + Optional[bool]: True if deletion is successful, + False otherwise, None if not implemented. + """ + if not ids or len(ids) == 0: + return True + from google.cloud import bigquery + + job_config = bigquery.QueryJobConfig( + query_parameters=[ + bigquery.ArrayQueryParameter("ids", "STRING", ids), + ] + ) + self.bq_client.query( + f""" + DELETE FROM `{self.full_table_id}` WHERE {self.doc_id_field} + IN UNNEST(@ids) + """, + job_config=job_config, + ).result() + return True + + async def adelete( + self, ids: Optional[List[str]] = None, **kwargs: Any + ) -> Optional[bool]: + """Delete by vector ID or other criteria. + + Args: + ids: List of ids to delete. + **kwargs: Other keyword arguments that subclasses might use. + + Returns: + Optional[bool]: True if deletion is successful, + False otherwise, None if not implemented. + """ + return await asyncio.get_running_loop().run_in_executor( + None, partial(self.delete, **kwargs), ids + ) + + def _search_with_score_and_embeddings_by_vector( + self, + embedding: List[float], + k: int = DEFAULT_TOP_K, + filter: Optional[Dict[str, Any]] = None, + brute_force: bool = False, + fraction_lists_to_search: Optional[float] = None, + ) -> List[Tuple[Document, List[float], float]]: + from google.cloud import bigquery + + # Create an index if no index exists. + if not self._have_index and not self._creating_index: + self._initialize_vector_index() + # Prepare filter + filter_expr = "TRUE" + if filter: + filter_expressions = [] + for i in filter.items(): + if isinstance(i[1], float): + expr = ( + "ABS(CAST(JSON_VALUE(" + f"base.`{self.metadata_field}`,'$.{i[0]}') " + f"AS FLOAT64) - {i[1]}) " + f"<= {sys.float_info.epsilon}" + ) + else: + val = str(i[1]).replace('"', '\\"') + expr = ( + f"JSON_VALUE(base.`{self.metadata_field}`,'$.{i[0]}')" + f' = "{val}"' + ) + filter_expressions.append(expr) + filter_expression_str = " AND ".join(filter_expressions) + filter_expr += f" AND ({filter_expression_str})" + # Configure and run a query job. + job_config = bigquery.QueryJobConfig( + query_parameters=[ + bigquery.ArrayQueryParameter("v", "FLOAT64", embedding), + ], + use_query_cache=False, + priority=bigquery.QueryPriority.BATCH, + ) + if self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE: + distance_type = "EUCLIDEAN" + elif self.distance_strategy == DistanceStrategy.COSINE: + distance_type = "COSINE" + # Default to EUCLIDEAN_DISTANCE + else: + distance_type = "EUCLIDEAN" + if brute_force: + options_string = ",options => '{\"use_brute_force\":true}'" + elif fraction_lists_to_search: + if fraction_lists_to_search == 0 or fraction_lists_to_search >= 1.0: + raise ValueError( + "`fraction_lists_to_search` must be between " "0.0 and 1.0" + ) + options_string = ( + ',options => \'{"fraction_lists_to_search":' + f"{fraction_lists_to_search}}}'" + ) + else: + options_string = "" + query = f""" + SELECT + base.*, + distance AS _vector_search_distance + FROM VECTOR_SEARCH( + TABLE `{self.full_table_id}`, + "{self.text_embedding_field}", + (SELECT @v AS {self.text_embedding_field}), + distance_type => "{distance_type}", + top_k => {k} + {options_string} + ) + WHERE {filter_expr} + LIMIT {k} + """ + document_tuples: List[Tuple[Document, List[float], float]] = [] + # TODO(vladkol): Use jobCreationMode=JOB_CREATION_OPTIONAL when available. + job = self.bq_client.query( + query, job_config=job_config, api_method=bigquery.enums.QueryApiMethod.QUERY + ) + # Process job results. + for row in job: + metadata = row[self.metadata_field] + if metadata: + metadata = json.loads(metadata) + else: + metadata = {} + metadata["__id"] = row[self.doc_id_field] + doc = Document(page_content=row[self.content_field], metadata=metadata) + document_tuples.append( + (doc, row[self.text_embedding_field], row["_vector_search_distance"]) + ) + return document_tuples + + def similarity_search_with_score_by_vector( + self, + embedding: List[float], + k: int = DEFAULT_TOP_K, + filter: Optional[Dict[str, Any]] = None, + brute_force: bool = False, + fraction_lists_to_search: Optional[float] = None, + **kwargs: Any, + ) -> List[Tuple[Document, float]]: + """Return docs most similar to embedding vector. + + Args: + embedding: Embedding to look up documents similar to. + k: Number of Documents to return. Defaults to 4. + filter: Filter on metadata properties, e.g. + { + "str_property": "foo", + "int_property": 123 + } + brute_force: Whether to use brute force search. Defaults to False. + fraction_lists_to_search: Optional percentage of lists to search, + must be in range 0.0 and 1.0, exclusive. + If Node, uses service's default which is 0.05. + + Returns: + List of Documents most similar to the query vector with distance. + """ + del kwargs + document_tuples = self._search_with_score_and_embeddings_by_vector( + embedding, k, filter, brute_force, fraction_lists_to_search + ) + return [(doc, distance) for doc, _, distance in document_tuples] + + def similarity_search_by_vector( + self, + embedding: List[float], + k: int = DEFAULT_TOP_K, + filter: Optional[Dict[str, Any]] = None, + brute_force: bool = False, + fraction_lists_to_search: Optional[float] = None, + **kwargs: Any, + ) -> List[Document]: + """Return docs most similar to embedding vector. + + Args: + embedding: Embedding to look up documents similar to. + k: Number of Documents to return. Defaults to 4. + filter: Filter on metadata properties, e.g. + { + "str_property": "foo", + "int_property": 123 + } + brute_force: Whether to use brute force search. Defaults to False. + fraction_lists_to_search: Optional percentage of lists to search, + must be in range 0.0 and 1.0, exclusive. + If Node, uses service's default which is 0.05. + + Returns: + List of Documents most similar to the query vector. + """ + tuples = self.similarity_search_with_score_by_vector( + embedding, k, filter, brute_force, fraction_lists_to_search, **kwargs + ) + return [i[0] for i in tuples] + + def similarity_search_with_score( + self, + query: str, + k: int = DEFAULT_TOP_K, + filter: Optional[Dict[str, Any]] = None, + brute_force: bool = False, + fraction_lists_to_search: Optional[float] = None, + **kwargs: Any, + ) -> List[Tuple[Document, float]]: + """Run similarity search with score. + + Args: + query: search query text. + k: Number of Documents to return. Defaults to 4. + filter: Filter on metadata properties, e.g. + { + "str_property": "foo", + "int_property": 123 + } + brute_force: Whether to use brute force search. Defaults to False. + fraction_lists_to_search: Optional percentage of lists to search, + must be in range 0.0 and 1.0, exclusive. + If Node, uses service's default which is 0.05. + + Returns: + List of Documents most similar to the query vector, with similarity scores. + """ + emb = self.embedding_model.embed_query(query) # type: ignore + return self.similarity_search_with_score_by_vector( + emb, k, filter, brute_force, fraction_lists_to_search, **kwargs + ) + + def similarity_search( + self, + query: str, + k: int = DEFAULT_TOP_K, + filter: Optional[Dict[str, Any]] = None, + brute_force: bool = False, + fraction_lists_to_search: Optional[float] = None, + **kwargs: Any, + ) -> List[Document]: + """Run similarity search. + + Args: + query: search query text. + k: Number of Documents to return. Defaults to 4. + filter: Filter on metadata properties, e.g. + { + "str_property": "foo", + "int_property": 123 + } + brute_force: Whether to use brute force search. Defaults to False. + fraction_lists_to_search: Optional percentage of lists to search, + must be in range 0.0 and 1.0, exclusive. + If Node, uses service's default which is 0.05. + + Returns: + List of Documents most similar to the query vector. + """ + tuples = self.similarity_search_with_score( + query, k, filter, brute_force, fraction_lists_to_search, **kwargs + ) + return [i[0] for i in tuples] + + def _select_relevance_score_fn(self) -> Callable[[float], float]: + if self.distance_strategy == DistanceStrategy.COSINE: + return BigQueryVectorSearch._cosine_relevance_score_fn + else: + raise ValueError( + "Relevance score is not supported " + f"for `{self.distance_strategy}` distance." + ) + + def max_marginal_relevance_search( + self, + query: str, + k: int = DEFAULT_TOP_K, + fetch_k: int = DEFAULT_TOP_K * 5, + lambda_mult: float = 0.5, + filter: Optional[Dict[str, Any]] = None, + brute_force: bool = False, + fraction_lists_to_search: Optional[float] = None, + **kwargs: Any, + ) -> List[Document]: + """Return docs selected using the maximal marginal relevance. + + Maximal marginal relevance optimizes for similarity to query AND diversity + among selected documents. + + Args: + query: search query text. + k: Number of Documents to return. Defaults to 4. + fetch_k: Number of Documents to fetch to pass to MMR algorithm. + lambda_mult: Number between 0 and 1 that determines the degree + of diversity among the results with 0 corresponding + to maximum diversity and 1 to minimum diversity. + Defaults to 0.5. + filter: Filter on metadata properties, e.g. + { + "str_property": "foo", + "int_property": 123 + } + brute_force: Whether to use brute force search. Defaults to False. + fraction_lists_to_search: Optional percentage of lists to search, + must be in range 0.0 and 1.0, exclusive. + If Node, uses service's default which is 0.05. + Returns: + List of Documents selected by maximal marginal relevance. + """ + query_embedding = self.embedding_model.embed_query( # type: ignore + query + ) + doc_tuples = self._search_with_score_and_embeddings_by_vector( + query_embedding, fetch_k, filter, brute_force, fraction_lists_to_search + ) + doc_embeddings = [d[1] for d in doc_tuples] + mmr_doc_indexes = maximal_marginal_relevance( + np.array(query_embedding), doc_embeddings, lambda_mult=lambda_mult, k=k + ) + return [doc_tuples[i][0] for i in mmr_doc_indexes] + + def max_marginal_relevance_search_by_vector( + self, + embedding: List[float], + k: int = DEFAULT_TOP_K, + fetch_k: int = DEFAULT_TOP_K * 5, + lambda_mult: float = 0.5, + filter: Optional[Dict[str, Any]] = None, + brute_force: bool = False, + fraction_lists_to_search: Optional[float] = None, + **kwargs: Any, + ) -> List[Document]: + """Return docs selected using the maximal marginal relevance. + + Maximal marginal relevance optimizes for similarity to query AND diversity + among selected documents. + + Args: + embedding: Embedding to look up documents similar to. + k: Number of Documents to return. Defaults to 4. + fetch_k: Number of Documents to fetch to pass to MMR algorithm. + lambda_mult: Number between 0 and 1 that determines the degree + of diversity among the results with 0 corresponding + to maximum diversity and 1 to minimum diversity. + Defaults to 0.5. + filter: Filter on metadata properties, e.g. + { + "str_property": "foo", + "int_property": 123 + } + brute_force: Whether to use brute force search. Defaults to False. + fraction_lists_to_search: Optional percentage of lists to search, + must be in range 0.0 and 1.0, exclusive. + If Node, uses service's default which is 0.05. + Returns: + List of Documents selected by maximal marginal relevance. + """ + doc_tuples = self._search_with_score_and_embeddings_by_vector( + embedding, fetch_k, filter, brute_force, fraction_lists_to_search + ) + doc_embeddings = [d[1] for d in doc_tuples] + mmr_doc_indexes = maximal_marginal_relevance( + np.array(embedding), doc_embeddings, lambda_mult=lambda_mult, k=k + ) + return [doc_tuples[i][0] for i in mmr_doc_indexes] + + async def amax_marginal_relevance_search( + self, + query: str, + k: int = DEFAULT_TOP_K, + fetch_k: int = DEFAULT_TOP_K * 5, + lambda_mult: float = 0.5, + filter: Optional[Dict[str, Any]] = None, + brute_force: bool = False, + fraction_lists_to_search: Optional[float] = None, + **kwargs: Any, + ) -> List[Document]: + """Return docs selected using the maximal marginal relevance.""" + + func = partial( + self.max_marginal_relevance_search, + query, + k=k, + fetch_k=fetch_k, + lambda_mult=lambda_mult, + filter=filter, + brute_force=brute_force, + fraction_lists_to_search=fraction_lists_to_search, + **kwargs, + ) + return await asyncio.get_event_loop().run_in_executor(None, func) + + async def amax_marginal_relevance_search_by_vector( + self, + embedding: List[float], + k: int = DEFAULT_TOP_K, + fetch_k: int = DEFAULT_TOP_K * 5, + lambda_mult: float = 0.5, + filter: Optional[Dict[str, Any]] = None, + brute_force: bool = False, + fraction_lists_to_search: Optional[float] = None, + **kwargs: Any, + ) -> List[Document]: + """Return docs selected using the maximal marginal relevance.""" + return await asyncio.get_running_loop().run_in_executor( + None, + partial(self.max_marginal_relevance_search_by_vector, **kwargs), + embedding, + k, + fetch_k, + lambda_mult, + filter, + brute_force, + fraction_lists_to_search, + ) + + @classmethod + def from_texts( + cls: Type["BigQueryVectorSearch"], + texts: List[str], + embedding: Embeddings, + metadatas: Optional[List[dict]] = None, + **kwargs: Any, + ) -> "BigQueryVectorSearch": + """Return VectorStore initialized from texts and embeddings.""" + vs_obj = BigQueryVectorSearch(embedding=embedding, **kwargs) + vs_obj.add_texts(texts, metadatas) + return vs_obj diff --git a/libs/community/tests/integration_tests/vectorstores/test_bigquery_vector_search.py b/libs/community/tests/integration_tests/vectorstores/test_bigquery_vector_search.py new file mode 100644 index 0000000000000..57da87d669655 --- /dev/null +++ b/libs/community/tests/integration_tests/vectorstores/test_bigquery_vector_search.py @@ -0,0 +1,102 @@ +"""Test BigQuery Vector Search. +In order to run this test, you need to install Google Cloud BigQuery SDK +pip install google-cloud-bigquery +Your end-user credentials would be used to make the calls (make sure you've run +`gcloud auth login` first). +""" + +import os +import uuid + +import pytest + +from langchain_community.vectorstores.bigquery_vector_search import BigQueryVectorSearch +from tests.integration_tests.vectorstores.fake_embeddings import FakeEmbeddings + +TEST_TABLE_NAME = "langchain_test_table" + + +@pytest.fixture(scope="class") +def store(request: pytest.FixtureRequest) -> BigQueryVectorSearch: + """BigQueryVectorStore tests context. + + In order to run this test, you define PROJECT environment variable + with GCP project id. + + Example: + export PROJECT=... + """ + from google.cloud import bigquery + + bigquery.Client(location="US").create_dataset( + TestBigQueryVectorStore.dataset_name, exists_ok=True + ) + TestBigQueryVectorStore.store = BigQueryVectorSearch( + project_id=os.environ.get("PROJECT", None), + embedding=FakeEmbeddings(), + dataset_name=TestBigQueryVectorStore.dataset_name, + table_name=TEST_TABLE_NAME, + ) + TestBigQueryVectorStore.store.add_texts( + TestBigQueryVectorStore.texts, TestBigQueryVectorStore.metadatas + ) + + def teardown() -> None: + bigquery.Client(location="US").delete_dataset( + TestBigQueryVectorStore.dataset_name, + delete_contents=True, + not_found_ok=True, + ) + + request.addfinalizer(teardown) + return TestBigQueryVectorStore.store + + +class TestBigQueryVectorStore: + """BigQueryVectorStore tests class.""" + + dataset_name = uuid.uuid4().hex + store: BigQueryVectorSearch + texts = ["apple", "ice cream", "Saturn", "candy", "banana"] + metadatas = [ + { + "kind": "fruit", + }, + { + "kind": "treat", + }, + { + "kind": "planet", + }, + { + "kind": "treat", + }, + { + "kind": "fruit", + }, + ] + + def test_semantic_search(self, store: BigQueryVectorSearch) -> None: + """Test on semantic similarity.""" + docs = store.similarity_search("food", k=4) + print(docs) + kinds = [d.metadata["kind"] for d in docs] + assert "fruit" in kinds + assert "treat" in kinds + assert "planet" not in kinds + + def test_semantic_search_filter_fruits(self, store: BigQueryVectorSearch) -> None: + """Test on semantic similarity with metadata filter.""" + docs = store.similarity_search("food", filter={"kind": "fruit"}) + kinds = [d.metadata["kind"] for d in docs] + assert "fruit" in kinds + assert "treat" not in kinds + assert "planet" not in kinds + + def test_get_doc_by_filter(self, store: BigQueryVectorSearch) -> None: + """Test on document retrieval with metadata filter.""" + docs = store.get_documents(filter={"kind": "fruit"}) + kinds = [d.metadata["kind"] for d in docs] + assert "fruit" in kinds + assert "treat" not in kinds + assert "planet" not in kinds