diff --git a/.gitignore b/.gitignore index 68bc17f..51fad65 100644 --- a/.gitignore +++ b/.gitignore @@ -158,3 +158,6 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + +datasets/* +vector_stores/* \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..b098fa1 --- /dev/null +++ b/Makefile @@ -0,0 +1,2 @@ +run-ci: + black src/ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..40d73e6 --- /dev/null +++ b/README.md @@ -0,0 +1,22 @@ +## Installation + +```bash +git clone git@github.com:explodinggradients/synthetic-qa-paper.git +``` + +```bash +cd synthetic-qa-paper +``` + +```bash +pip install -r requirements.txt +``` + +```bash +pip install -e . +``` + + +## Usage + +See example [notebook](notebooks/experiment.ipynb) for usage. \ No newline at end of file diff --git a/_run.ipynb b/_run.ipynb deleted file mode 100755 index 1615479..0000000 --- a/_run.ipynb +++ /dev/null @@ -1 +0,0 @@ -{"cells":[{"cell_type":"code","execution_count":1,"metadata":{"colab":{"base_uri":"https://localhost:8080/"},"execution":{"iopub.execute_input":"2024-07-23T12:09:42.337401Z","iopub.status.busy":"2024-07-23T12:09:42.336717Z","iopub.status.idle":"2024-07-23T12:10:37.879309Z","shell.execute_reply":"2024-07-23T12:10:37.878172Z","shell.execute_reply.started":"2024-07-23T12:09:42.337368Z"},"id":"hrFYqiXsCPXD","outputId":"9d3caf44-b4e3-4686-e7a9-3d296bbce1ce","trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":["\u001b[33mWARNING: Retrying (Retry(total=4, connect=None, read=None, redirect=None, status=None)) after connection broken by 'NewConnectionError(': Failed to establish a new connection: [Errno -2] Name or service not known')': /simple/langchain/\u001b[0m\u001b[33m\n","\u001b[0m\u001b[33m WARNING: Retrying (Retry(total=4, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError(\"HTTPSConnectionPool(host='files.pythonhosted.org', port=443): Read timed out. (read timeout=15)\")': /packages/3c/78/c1de55eb3311f2c200a8b91724414b8d6f5ae78891c15d9d936ea43c3dba/marshmallow-3.22.0-py3-none-any.whl.metadata\u001b[0m\u001b[33m\n","\u001b[0m\u001b[33mWARNING: Retrying (Retry(total=4, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError(\"HTTPSConnectionPool(host='pypi.org', port=443): Read timed out. (read timeout=15)\")': /simple/typing-inspect/\u001b[0m\u001b[33m\n","\u001b[0m^C\n"]}],"source":["!pip install -q langchain langchain-core langchain-community langchain-huggingface ragatouille"]},{"cell_type":"code","execution_count":null,"metadata":{"execution":{"iopub.execute_input":"2024-07-23T12:16:19.741127Z","iopub.status.busy":"2024-07-23T12:16:19.740107Z","iopub.status.idle":"2024-07-23T12:16:23.617034Z","shell.execute_reply":"2024-07-23T12:16:23.615914Z","shell.execute_reply.started":"2024-07-23T12:16:19.741088Z"},"trusted":true},"outputs":[],"source":["%cd /content\n","!rm -rf card\n","!git clone https://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@github.com/ErfanMoosaviMonazzah/card.git\n","\n","%cd /content/card/src2"]},{"cell_type":"code","execution_count":1,"metadata":{"colab":{"base_uri":"https://localhost:8080/"},"execution":{"iopub.execute_input":"2024-07-23T12:16:29.097980Z","iopub.status.busy":"2024-07-23T12:16:29.097568Z"},"id":"aCR1Kd-HHqQ7","outputId":"bdd95aa8-1213-4a5c-d27c-4dc2cf4b17de","trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":["- Loading docs from Cache: \"./caches/sample_experiment/cached_docs.pkl\"\n","- Loading Embedding Model & Tokenizer: thenlper/gte-small\n","- Chunk Size (#Tokens): 256\n","- Loading chunks from Cache: \"./caches/sample_experiment/cached_chunks.pkl\"\n","- Loading Vector DB from Cache: \"./caches/sample_experiment/cached_vector_db\"\n","- Loading Queries from Cache: \"./caches/sample_experiment/cached_queries.pkl\"\n","- Loading Rets from Cache: \"./caches/sample_experiment/cached_rets.pkl\"\n","- Loading Generator Model & Tokenizer: \"/home/pargar/hub/models/microsoft/Phi-3.5-mini-instruct\"\n","`flash-attention` package not found, consider installing for better performance: No module named 'flash_attn'.\n","Current `flash-attention` does not support `window_size`. Either upgrade or use `attn_implementation='eager'`.\n","Loading checkpoint shards: 100%|██████████████████| 2/2 [00:02<00:00, 1.38s/it]\n","/home/pargar/miniconda3/envs/tomcat_temp/lib/python3.11/site-packages/transformers/tokenization_utils_base.py:1601: FutureWarning: `clean_up_tokenization_spaces` was not set. It will be set to `True` by default. This behavior will be depracted in transformers v4.45, and will be then set to `False` by default. For more details check this issue: https://github.com/huggingface/transformers/issues/31884\n"," warnings.warn(\n","/home/pargar/miniconda3/envs/tomcat_temp/lib/python3.11/site-packages/colbert/utils/amp.py:12: FutureWarning: `torch.cuda.amp.GradScaler(args...)` is deprecated. Please use `torch.amp.GradScaler('cuda', args...)` instead.\n"," self.scaler = torch.cuda.amp.GradScaler()\n","- Loading reranked_rets from Cache: \"./caches/sample_experiment/cached_reranked_rets.pkl\"\n"," 0%| | 0/2556 [00:00 512). Running this sequence through the model will result in indexing errors\n","- Splitting Documents to Chunks: 100%|████████| 609/609 [00:25<00:00, 24.01it/s]\n","- Removing Duplicated Chunks: 100%|████| 8193/8193 [00:00<00:00, 1177088.88it/s]\n","- Caching Chunks at \"/home/erfan/Desktop/actives/card/src/experiments/multihoprag_bge/cached_chunks.pkl\"\n","- 609 Documents splitted into 8,159 Chunks\n","- Vector DB: Start Embedding at 2024-07-24T12:58:27.145320\n","- Vector DB: Finished Embedding at 2024-07-24T13:10:08.573201\n","- Caching Vector DB at \"/home/erfan/Desktop/actives/card/src/experiments/multihoprag_bge/cached_vector_db\"\n","- Loading Queries Dataset from HF: yixuantt/MultiHopRAG (MultiHopRAG)\n","- Caching Queries at \"/home/erfan/Desktop/actives/card/src/experiments/multihoprag_bge/cached_queries.pkl\"\n","- Fetching Embeddings (2556 Queries)\n","- Searching Vector Index\n","- Fetching Similar Documents\n","- Saving Retrieved Documents: /home/erfan/Desktop/actives/card/src/experiments/multihoprag_bge_ret.json\n"]}],"source":["!python run_rag.py --yaml-config experiments/multihoprag_bge.yaml"]},{"cell_type":"code","execution_count":8,"metadata":{},"outputs":[],"source":["from datasets import load_dataset\n","import json, ret_evals\n","\n","with open('experiments/multihoprag_bge_ret.json', 'r') as file:\n"," rets = json.load(file)['ret']\n"," \n","ds = load_dataset('yixuantt/MultiHopRAG', 'MultiHopRAG', split='train')\n","golds = [[gold['fact'] for gold in evs] for evs in ds['evidence_list']]\n","\n","ls_rets = []\n","ls_golds = []\n","\n","for row, ret, gold in zip(ds, rets, golds):\n"," if row['question_type'] == 'null_query':\n"," continue\n"," ls_rets.append(ret)\n"," ls_golds.append(gold)"]},{"cell_type":"code","execution_count":9,"metadata":{},"outputs":[{"name":"stderr","output_type":"stream","text":["Hits@4 0.5871: : 2255it [00:02, 1005.75it/s]\n","Hits@10 0.7508: : 2255it [00:02, 1127.07it/s]\n","MAP@10 0.2121: : 2255it [00:02, 1059.75it/s]\n","MRR@10 0.4501: : 2255it [00:02, 1063.97it/s]\n"]}],"source":["_ = ret_evals.hits_at(4, ls_rets, ls_golds)\n","_ = ret_evals.hits_at(10, ls_rets, ls_golds)\n","_ = ret_evals.map_at(10, ls_rets, ls_golds)\n","_ = ret_evals.mrr_at(10, ls_rets, ls_golds)"]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":[]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["# [5:25 PM] Luis Espinosa-Anke\n","# import numpy as np\n"," \n","# # Assuming questionsdb and vectordb are already defined and are numpy arrays\n","\n","# res = questionsdb @ vectordb.T\n"," \n","# # Flatten the result to a 1D array\n","\n","# flattened_res = res.flatten()\n"," \n","# # Get the indices of the top 10 most similar vectors\n","\n","# top_10_indices = np.argsort(flattened_res)[-10:][::-1]\n"," \n","# # Retrieve the top 10 most similar vectors\n","\n","# top_10_similar = flattened_res[top_10_indices]\n"," \n","# print(\"Indices of top 10 most similar vectors:\", top_10_indices)\n","\n","# print(\"Top 10 most similar vectors:\", top_10_similar)\n"," \n","# [5:25 PM] Luis Espinosa-Anke\n","# vectordb = rag.vectorizer.vector_db.index.reconstruct_n(0)\n"," "]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":[]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":[]}],"metadata":{"accelerator":"GPU","colab":{"gpuType":"T4","provenance":[]},"kaggle":{"accelerator":"gpu","dataSources":[],"dockerImageVersionId":30747,"isGpuEnabled":true,"isInternetEnabled":true,"language":"python","sourceType":"notebook"},"kernelspec":{"display_name":"Python 3","language":"python","name":"python3"},"language_info":{"codemirror_mode":{"name":"ipython","version":3},"file_extension":".py","mimetype":"text/x-python","name":"python","nbconvert_exporter":"python","pygments_lexer":"ipython3","version":"3.12.2"}},"nbformat":4,"nbformat_minor":4} diff --git a/caches/sample_experiment/cached_augmented_generations.pkl b/caches/sample_experiment/cached_augmented_generations.pkl deleted file mode 100755 index 6a55cb4..0000000 Binary files a/caches/sample_experiment/cached_augmented_generations.pkl and /dev/null differ diff --git a/caches/sample_experiment/cached_chunks.pkl b/caches/sample_experiment/cached_chunks.pkl deleted file mode 100755 index f53d537..0000000 Binary files a/caches/sample_experiment/cached_chunks.pkl and /dev/null differ diff --git a/caches/sample_experiment/cached_docs.pkl b/caches/sample_experiment/cached_docs.pkl deleted file mode 100755 index d59e633..0000000 Binary files a/caches/sample_experiment/cached_docs.pkl and /dev/null differ diff --git a/caches/sample_experiment/cached_queries.pkl b/caches/sample_experiment/cached_queries.pkl deleted file mode 100755 index 776683f..0000000 Binary files a/caches/sample_experiment/cached_queries.pkl and /dev/null differ diff --git a/caches/sample_experiment/cached_reranked_rets.pkl b/caches/sample_experiment/cached_reranked_rets.pkl deleted file mode 100755 index 9d303cd..0000000 Binary files a/caches/sample_experiment/cached_reranked_rets.pkl and /dev/null differ diff --git a/caches/sample_experiment/cached_rets.pkl b/caches/sample_experiment/cached_rets.pkl deleted file mode 100755 index 6b48678..0000000 Binary files a/caches/sample_experiment/cached_rets.pkl and /dev/null differ diff --git a/caches/sample_experiment/cached_vector_db/index.faiss b/caches/sample_experiment/cached_vector_db/index.faiss deleted file mode 100755 index 57748e7..0000000 Binary files a/caches/sample_experiment/cached_vector_db/index.faiss and /dev/null differ diff --git a/caches/sample_experiment/cached_vector_db/index.pkl b/caches/sample_experiment/cached_vector_db/index.pkl deleted file mode 100755 index d37babc..0000000 Binary files a/caches/sample_experiment/cached_vector_db/index.pkl and /dev/null differ diff --git a/caches/sample_experiment/disabled__cached_reranked_rets.pkl b/caches/sample_experiment/disabled__cached_reranked_rets.pkl deleted file mode 100755 index 9d303cd..0000000 Binary files a/caches/sample_experiment/disabled__cached_reranked_rets.pkl and /dev/null differ diff --git a/customized_faiss.py b/customized_faiss.py deleted file mode 100755 index 5bbdfe5..0000000 --- a/customized_faiss.py +++ /dev/null @@ -1,1284 +0,0 @@ -from __future__ import annotations - -import logging -import operator -import os -import pickle -import uuid -import warnings -from pathlib import Path -from typing import ( - Any, - Callable, - Dict, - Iterable, - List, - Optional, - Sized, - Tuple, - Union, -) - -import numpy as np -from langchain_core.documents import Document -from langchain_core.embeddings import Embeddings -from langchain_core.runnables.config import run_in_executor -from langchain_core.vectorstores import VectorStore - -from langchain_community.docstore.base import AddableMixin, Docstore -from langchain_community.docstore.in_memory import InMemoryDocstore -from langchain_community.vectorstores.utils import ( - DistanceStrategy, - maximal_marginal_relevance, -) - -logger = logging.getLogger(__name__) - - -def dependable_faiss_import(no_avx2: Optional[bool] = None) -> Any: - """ - Import faiss if available, otherwise raise error. - If FAISS_NO_AVX2 environment variable is set, it will be considered - to load FAISS with no AVX2 optimization. - - Args: - no_avx2: Load FAISS strictly with no AVX2 optimization - so that the vectorstore is portable and compatible with other devices. - """ - if no_avx2 is None and "FAISS_NO_AVX2" in os.environ: - no_avx2 = bool(os.getenv("FAISS_NO_AVX2")) - - try: - if no_avx2: - from faiss import swigfaiss as faiss - else: - import faiss - except ImportError: - raise ImportError( - "Could not import faiss python package. " - "Please install it with `pip install faiss-gpu` (for CUDA supported GPU) " - "or `pip install faiss-cpu` (depending on Python version)." - ) - return faiss - - -def _len_check_if_sized(x: Any, y: Any, x_name: str, y_name: str) -> None: - if isinstance(x, Sized) and isinstance(y, Sized) and len(x) != len(y): - raise ValueError( - f"{x_name} and {y_name} expected to be equal length but " - f"len({x_name})={len(x)} and len({y_name})={len(y)}" - ) - return - - -class FAISS(VectorStore): - """`Meta Faiss` vector store. - - To use, you must have the ``faiss`` python package installed. - - Example: - .. code-block:: python - - from langchain_community.embeddings.openai import OpenAIEmbeddings - from langchain_community.vectorstores import FAISS - - embeddings = OpenAIEmbeddings() - texts = ["FAISS is an important library", "LangChain supports FAISS"] - faiss = FAISS.from_texts(texts, embeddings) - - """ - - def __init__( - self, - embedding_function: Union[ - Callable[[str], List[float]], - Embeddings, - ], - index: Any, - docstore: Docstore, - index_to_docstore_id: Dict[int, str], - relevance_score_fn: Optional[Callable[[float], float]] = None, - normalize_L2: bool = False, - distance_strategy: DistanceStrategy = DistanceStrategy.EUCLIDEAN_DISTANCE, - ): - """Initialize with necessary components.""" - if not isinstance(embedding_function, Embeddings): - logger.warning( - "`embedding_function` is expected to be an Embeddings object, support " - "for passing in a function will soon be removed." - ) - self.embedding_function = embedding_function - self.index = index - self.docstore = docstore - self.index_to_docstore_id = index_to_docstore_id - self.distance_strategy = distance_strategy - self.override_relevance_score_fn = relevance_score_fn - self._normalize_L2 = normalize_L2 - if ( - self.distance_strategy != DistanceStrategy.EUCLIDEAN_DISTANCE - and self._normalize_L2 - ): - warnings.warn( - "Normalizing L2 is not applicable for " - f"metric type: {self.distance_strategy}" - ) - - @property - def embeddings(self) -> Optional[Embeddings]: - return ( - self.embedding_function - if isinstance(self.embedding_function, Embeddings) - else None - ) - - def _embed_documents(self, texts: List[str]) -> List[List[float]]: - if isinstance(self.embedding_function, Embeddings): - return self.embedding_function.embed_documents(texts) - else: - return [self.embedding_function(text) for text in texts] - - async def _aembed_documents(self, texts: List[str]) -> List[List[float]]: - if isinstance(self.embedding_function, Embeddings): - return await self.embedding_function.aembed_documents(texts) - else: - # return await asyncio.gather( - # [self.embedding_function(text) for text in texts] - # ) - raise Exception( - "`embedding_function` is expected to be an Embeddings object, support " - "for passing in a function will soon be removed." - ) - - def _embed_query(self, text: str) -> List[float]: - if isinstance(self.embedding_function, Embeddings): - return self.embedding_function.embed_query(text) - else: - return self.embedding_function(text) - - async def _aembed_query(self, text: str) -> List[float]: - if isinstance(self.embedding_function, Embeddings): - return await self.embedding_function.aembed_query(text) - else: - # return await self.embedding_function(text) - raise Exception( - "`embedding_function` is expected to be an Embeddings object, support " - "for passing in a function will soon be removed." - ) - - def __add( - self, - texts: Iterable[str], - embeddings: Iterable[List[float]], - metadatas: Optional[Iterable[dict]] = None, - ids: Optional[List[str]] = None, - ) -> List[str]: - faiss = dependable_faiss_import() - - if not isinstance(self.docstore, AddableMixin): - raise ValueError( - "If trying to add texts, the underlying docstore should support " - f"adding items, which {self.docstore} does not" - ) - - _len_check_if_sized(texts, metadatas, "texts", "metadatas") - _metadatas = metadatas or ({} for _ in texts) - documents = [ - Document(page_content=t, metadata=m) for t, m in zip(texts, _metadatas) - ] - - _len_check_if_sized(documents, embeddings, "documents", "embeddings") - _len_check_if_sized(documents, ids, "documents", "ids") - - if ids and len(ids) != len(set(ids)): - raise ValueError("Duplicate ids found in the ids list.") - - # Add to the index. - vector = np.array(embeddings, dtype=np.float32) - if self._normalize_L2: - faiss.normalize_L2(vector) - self.index.add(vector) - - # Add information to docstore and index. - ids = ids or [str(uuid.uuid4()) for _ in texts] - self.docstore.add({id_: doc for id_, doc in zip(ids, documents)}) - starting_len = len(self.index_to_docstore_id) - index_to_id = {starting_len + j: id_ for j, id_ in enumerate(ids)} - self.index_to_docstore_id.update(index_to_id) - return ids - - def add_texts( - self, - texts: Iterable[str], - metadatas: Optional[List[dict]] = None, - ids: Optional[List[str]] = None, - **kwargs: Any, - ) -> List[str]: - """Run more texts through the embeddings and add to the vectorstore. - - Args: - texts: Iterable of strings to add to the vectorstore. - metadatas: Optional list of metadatas associated with the texts. - ids: Optional list of unique IDs. - - Returns: - List of ids from adding the texts into the vectorstore. - """ - texts = list(texts) - embeddings = self._embed_documents(texts) - return self.__add(texts, embeddings, metadatas=metadatas, ids=ids) - - async def aadd_texts( - self, - texts: Iterable[str], - metadatas: Optional[List[dict]] = None, - ids: Optional[List[str]] = None, - **kwargs: Any, - ) -> List[str]: - """Run more texts through the embeddings and add to the vectorstore - asynchronously. - - Args: - texts: Iterable of strings to add to the vectorstore. - metadatas: Optional list of metadatas associated with the texts. - ids: Optional list of unique IDs. - - Returns: - List of ids from adding the texts into the vectorstore. - """ - texts = list(texts) - embeddings = await self._aembed_documents(texts) - return self.__add(texts, embeddings, metadatas=metadatas, ids=ids) - - def add_embeddings( - self, - text_embeddings: Iterable[Tuple[str, List[float]]], - metadatas: Optional[List[dict]] = None, - ids: Optional[List[str]] = None, - **kwargs: Any, - ) -> List[str]: - """Add the given texts and embeddings to the vectorstore. - - Args: - text_embeddings: Iterable pairs of string and embedding to - add to the vectorstore. - metadatas: Optional list of metadatas associated with the texts. - ids: Optional list of unique IDs. - - Returns: - List of ids from adding the texts into the vectorstore. - """ - # Embed and create the documents. - texts, embeddings = zip(*text_embeddings) - return self.__add(texts, embeddings, metadatas=metadatas, ids=ids) - - def similarity_search_with_score_by_vector( - self, - embedding: List[float], - k: int = 4, - filter: Optional[Union[Callable, Dict[str, Any]]] = None, - fetch_k: int = 20, - **kwargs: Any, - ) -> List[Tuple[Document, float]]: - """Return docs most similar to query. - - Args: - embedding: Embedding vector to look up documents similar to. - k: Number of Documents to return. Defaults to 4. - filter (Optional[Union[Callable, Dict[str, Any]]]): Filter by metadata. - Defaults to None. If a callable, it must take as input the - metadata dict of Document and return a bool. - fetch_k: (Optional[int]) Number of Documents to fetch before filtering. - Defaults to 20. - **kwargs: kwargs to be passed to similarity search. Can include: - score_threshold: Optional, a floating point value between 0 to 1 to - filter the resulting set of retrieved docs - - Returns: - List of documents most similar to the query text and L2 distance - in float for each. Lower score represents more similarity. - """ - faiss = dependable_faiss_import() - vector = np.array([embedding], dtype=np.float32) - # print(f'vector: {vector.shape}') - if self._normalize_L2: - # print('_normalize_L2') - faiss.normalize_L2(vector) - scores, indices = self.index.search(vector, k if filter is None else fetch_k) - # print(f'scores\n{scores}') - # print(f'indices\n{indices}') - docs = [] - - if filter is not None: - filter_func = self._create_filter_func(filter) - - for j, i in enumerate(indices[0]): - if i == -1: - # This happens when not enough docs are returned. - continue - _id = self.index_to_docstore_id[i] - doc = self.docstore.search(_id) - if not isinstance(doc, Document): - raise ValueError(f"Could not find document for id {_id}, got {doc}") - if filter is not None: - if filter_func(doc.metadata): - docs.append((doc, scores[0][j])) - else: - docs.append((doc, scores[0][j])) - - score_threshold = kwargs.get("score_threshold") - if score_threshold is not None: - cmp = ( - operator.ge - if self.distance_strategy - in (DistanceStrategy.MAX_INNER_PRODUCT, DistanceStrategy.JACCARD) - else operator.le - ) - docs = [ - (doc, similarity) - for doc, similarity in docs - if cmp(similarity, score_threshold) - ] - return docs[:k] - - - - def batch_similarity_search( - self, - queries: List[str], - # embeddings: List[List[float]], - k: int = 4, - verbose: bool = True, - # filter: Optional[Union[Callable, Dict[str, Any]]] = None, - # fetch_k: int = 20, - # **kwargs: Any, - ) -> List[List[Document]]: - """Return docs most similar to query. - - Args: - embedding: Embedding vector to look up documents similar to. - k: Number of Documents to return. Defaults to 4. - filter (Optional[Union[Callable, Dict[str, Any]]]): Filter by metadata. - Defaults to None. If a callable, it must take as input the - metadata dict of Document and return a bool. - fetch_k: (Optional[int]) Number of Documents to fetch before filtering. - Defaults to 20. - **kwargs: kwargs to be passed to similarity search. Can include: - score_threshold: Optional, a floating point value between 0 to 1 to - filter the resulting set of retrieved docs - - Returns: - List of documents most similar to the query text and L2 distance - in float for each. Lower score represents more similarity. - """ - # faiss = dependable_faiss_import() - if verbose: print(f'- Fetching Embeddings ({len(queries)} Queries)') - embeddings = self._embed_documents(queries) - vectors = np.array(embeddings, dtype=np.float32) - if verbose: print('- Searching Vector Index') - _, I = self.index.search(vectors, k) - if verbose: print('- Fetching Similar Documents') - ls_docs = [] - for i in range(I.shape[0]): - docs = [] - for j in range(I.shape[1]): - ind = I[i, j] - doc_ind = self.index_to_docstore_id[ind] - doc = self.docstore.search(doc_ind) - docs.append(doc) - ls_docs.append(docs) - - return ls_docs - - async def asimilarity_search_with_score_by_vector( - self, - embedding: List[float], - k: int = 4, - filter: Optional[Union[Callable, Dict[str, Any]]] = None, - fetch_k: int = 20, - **kwargs: Any, - ) -> List[Tuple[Document, float]]: - """Return docs most similar to query asynchronously. - - Args: - embedding: Embedding vector to look up documents similar to. - k: Number of Documents to return. Defaults to 4. - filter (Optional[Dict[str, Any]]): Filter by metadata. - Defaults to None. If a callable, it must take as input the - metadata dict of Document and return a bool. - - fetch_k: (Optional[int]) Number of Documents to fetch before filtering. - Defaults to 20. - **kwargs: kwargs to be passed to similarity search. Can include: - score_threshold: Optional, a floating point value between 0 to 1 to - filter the resulting set of retrieved docs - - Returns: - List of documents most similar to the query text and L2 distance - in float for each. Lower score represents more similarity. - """ - - # This is a temporary workaround to make the similarity search asynchronous. - return await run_in_executor( - None, - self.similarity_search_with_score_by_vector, - embedding, - k=k, - filter=filter, - fetch_k=fetch_k, - **kwargs, - ) - - def similarity_search_with_score( - self, - query: str, - k: int = 4, - filter: Optional[Union[Callable, Dict[str, Any]]] = None, - fetch_k: int = 20, - **kwargs: Any, - ) -> List[Tuple[Document, float]]: - """Return docs most similar to query. - - Args: - query: Text to look up documents similar to. - k: Number of Documents to return. Defaults to 4. - filter (Optional[Dict[str, str]]): Filter by metadata. - Defaults to None. If a callable, it must take as input the - metadata dict of Document and return a bool. - - fetch_k: (Optional[int]) Number of Documents to fetch before filtering. - Defaults to 20. - - Returns: - List of documents most similar to the query text with - L2 distance in float. Lower score represents more similarity. - """ - embedding = self._embed_query(query) - docs = self.similarity_search_with_score_by_vector( - embedding, - k, - filter=filter, - fetch_k=fetch_k, - **kwargs, - ) - return docs - - async def asimilarity_search_with_score( - self, - query: str, - k: int = 4, - filter: Optional[Union[Callable, Dict[str, Any]]] = None, - fetch_k: int = 20, - **kwargs: Any, - ) -> List[Tuple[Document, float]]: - """Return docs most similar to query asynchronously. - - Args: - query: Text to look up documents similar to. - k: Number of Documents to return. Defaults to 4. - filter (Optional[Dict[str, str]]): Filter by metadata. - Defaults to None. If a callable, it must take as input the - metadata dict of Document and return a bool. - - fetch_k: (Optional[int]) Number of Documents to fetch before filtering. - Defaults to 20. - - Returns: - List of documents most similar to the query text with - L2 distance in float. Lower score represents more similarity. - """ - embedding = await self._aembed_query(query) - docs = await self.asimilarity_search_with_score_by_vector( - embedding, - k, - filter=filter, - fetch_k=fetch_k, - **kwargs, - ) - return docs - - def similarity_search_by_vector( - self, - embedding: List[float], - k: int = 4, - filter: Optional[Dict[str, Any]] = None, - fetch_k: int = 20, - **kwargs: Any, - ) -> List[Document]: - """Return docs most similar to embedding vector. - - Args: - embedding: Embedding to look up documents similar to. - k: Number of Documents to return. Defaults to 4. - filter (Optional[Dict[str, str]]): Filter by metadata. - Defaults to None. If a callable, it must take as input the - metadata dict of Document and return a bool. - - fetch_k: (Optional[int]) Number of Documents to fetch before filtering. - Defaults to 20. - - Returns: - List of Documents most similar to the embedding. - """ - docs_and_scores = self.similarity_search_with_score_by_vector( - embedding, - k, - filter=filter, - fetch_k=fetch_k, - **kwargs, - ) - return [doc for doc, _ in docs_and_scores] - - async def asimilarity_search_by_vector( - self, - embedding: List[float], - k: int = 4, - filter: Optional[Union[Callable, Dict[str, Any]]] = None, - fetch_k: int = 20, - **kwargs: Any, - ) -> List[Document]: - """Return docs most similar to embedding vector asynchronously. - - Args: - embedding: Embedding to look up documents similar to. - k: Number of Documents to return. Defaults to 4. - filter (Optional[Dict[str, str]]): Filter by metadata. - Defaults to None. If a callable, it must take as input the - metadata dict of Document and return a bool. - - fetch_k: (Optional[int]) Number of Documents to fetch before filtering. - Defaults to 20. - - Returns: - List of Documents most similar to the embedding. - """ - docs_and_scores = await self.asimilarity_search_with_score_by_vector( - embedding, - k, - filter=filter, - fetch_k=fetch_k, - **kwargs, - ) - return [doc for doc, _ in docs_and_scores] - - def similarity_search( - self, - query: str, - k: int = 4, - filter: Optional[Union[Callable, Dict[str, Any]]] = None, - fetch_k: int = 20, - **kwargs: Any, - ) -> List[Document]: - """Return docs most similar to query. - - Args: - query: Text to look up documents similar to. - k: Number of Documents to return. Defaults to 4. - filter: (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. - fetch_k: (Optional[int]) Number of Documents to fetch before filtering. - Defaults to 20. - - Returns: - List of Documents most similar to the query. - """ - docs_and_scores = self.similarity_search_with_score( - query, k, filter=filter, fetch_k=fetch_k, **kwargs - ) - return [doc for doc, _ in docs_and_scores] - - async def asimilarity_search( - self, - query: str, - k: int = 4, - filter: Optional[Union[Callable, Dict[str, Any]]] = None, - fetch_k: int = 20, - **kwargs: Any, - ) -> List[Document]: - """Return docs most similar to query asynchronously. - - Args: - query: Text to look up documents similar to. - k: Number of Documents to return. Defaults to 4. - filter: (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. - fetch_k: (Optional[int]) Number of Documents to fetch before filtering. - Defaults to 20. - - Returns: - List of Documents most similar to the query. - """ - docs_and_scores = await self.asimilarity_search_with_score( - query, k, filter=filter, fetch_k=fetch_k, **kwargs - ) - return [doc for doc, _ in docs_and_scores] - - def max_marginal_relevance_search_with_score_by_vector( - self, - embedding: List[float], - *, - k: int = 4, - fetch_k: int = 20, - lambda_mult: float = 0.5, - filter: Optional[Union[Callable, Dict[str, Any]]] = None, - ) -> List[Tuple[Document, float]]: - """Return docs and their similarity scores selected using the maximal marginal - relevance. - - Maximal marginal relevance optimizes for similarity to query AND diversity - among selected documents. - - Args: - embedding: Embedding to look up documents similar to. - k: Number of Documents to return. Defaults to 4. - fetch_k: Number of Documents to fetch before filtering to - pass to MMR algorithm. - lambda_mult: Number between 0 and 1 that determines the degree - of diversity among the results with 0 corresponding - to maximum diversity and 1 to minimum diversity. - Defaults to 0.5. - Returns: - List of Documents and similarity scores selected by maximal marginal - relevance and score for each. - """ - scores, indices = self.index.search( - np.array([embedding], dtype=np.float32), - fetch_k if filter is None else fetch_k * 2, - ) - if filter is not None: - filter_func = self._create_filter_func(filter) - filtered_indices = [] - for i in indices[0]: - if i == -1: - # This happens when not enough docs are returned. - continue - _id = self.index_to_docstore_id[i] - doc = self.docstore.search(_id) - if not isinstance(doc, Document): - raise ValueError(f"Could not find document for id {_id}, got {doc}") - if filter_func(doc.metadata): - filtered_indices.append(i) - indices = np.array([filtered_indices]) - # -1 happens when not enough docs are returned. - embeddings = [self.index.reconstruct(int(i)) for i in indices[0] if i != -1] - mmr_selected = maximal_marginal_relevance( - np.array([embedding], dtype=np.float32), - embeddings, - k=k, - lambda_mult=lambda_mult, - ) - - docs_and_scores = [] - for i in mmr_selected: - if indices[0][i] == -1: - # This happens when not enough docs are returned. - continue - _id = self.index_to_docstore_id[indices[0][i]] - doc = self.docstore.search(_id) - if not isinstance(doc, Document): - raise ValueError(f"Could not find document for id {_id}, got {doc}") - docs_and_scores.append((doc, scores[0][i])) - - return docs_and_scores - - async def amax_marginal_relevance_search_with_score_by_vector( - self, - embedding: List[float], - *, - k: int = 4, - fetch_k: int = 20, - lambda_mult: float = 0.5, - filter: Optional[Union[Callable, Dict[str, Any]]] = None, - ) -> List[Tuple[Document, float]]: - """Return docs and their similarity scores selected using the maximal marginal - relevance asynchronously. - - Maximal marginal relevance optimizes for similarity to query AND diversity - among selected documents. - - Args: - embedding: Embedding to look up documents similar to. - k: Number of Documents to return. Defaults to 4. - fetch_k: Number of Documents to fetch before filtering to - pass to MMR algorithm. - lambda_mult: Number between 0 and 1 that determines the degree - of diversity among the results with 0 corresponding - to maximum diversity and 1 to minimum diversity. - Defaults to 0.5. - Returns: - List of Documents and similarity scores selected by maximal marginal - relevance and score for each. - """ - # This is a temporary workaround to make the similarity search asynchronous. - return await run_in_executor( - None, - self.max_marginal_relevance_search_with_score_by_vector, - embedding, - k=k, - fetch_k=fetch_k, - lambda_mult=lambda_mult, - filter=filter, - ) - - def max_marginal_relevance_search_by_vector( - self, - embedding: List[float], - k: int = 4, - fetch_k: int = 20, - lambda_mult: float = 0.5, - filter: Optional[Union[Callable, Dict[str, Any]]] = None, - **kwargs: Any, - ) -> List[Document]: - """Return docs selected using the maximal marginal relevance. - - Maximal marginal relevance optimizes for similarity to query AND diversity - among selected documents. - - Args: - embedding: Embedding to look up documents similar to. - k: Number of Documents to return. Defaults to 4. - fetch_k: Number of Documents to fetch before filtering to - pass to MMR algorithm. - lambda_mult: Number between 0 and 1 that determines the degree - of diversity among the results with 0 corresponding - to maximum diversity and 1 to minimum diversity. - Defaults to 0.5. - Returns: - List of Documents selected by maximal marginal relevance. - """ - docs_and_scores = self.max_marginal_relevance_search_with_score_by_vector( - embedding, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult, filter=filter - ) - return [doc for doc, _ in docs_and_scores] - - async def amax_marginal_relevance_search_by_vector( - self, - embedding: List[float], - k: int = 4, - fetch_k: int = 20, - lambda_mult: float = 0.5, - filter: Optional[Union[Callable, Dict[str, Any]]] = None, - **kwargs: Any, - ) -> List[Document]: - """Return docs selected using the maximal marginal relevance asynchronously. - - Maximal marginal relevance optimizes for similarity to query AND diversity - among selected documents. - - Args: - embedding: Embedding to look up documents similar to. - k: Number of Documents to return. Defaults to 4. - fetch_k: Number of Documents to fetch before filtering to - pass to MMR algorithm. - lambda_mult: Number between 0 and 1 that determines the degree - of diversity among the results with 0 corresponding - to maximum diversity and 1 to minimum diversity. - Defaults to 0.5. - Returns: - List of Documents selected by maximal marginal relevance. - """ - docs_and_scores = ( - await self.amax_marginal_relevance_search_with_score_by_vector( - embedding, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult, filter=filter - ) - ) - return [doc for doc, _ in docs_and_scores] - - def max_marginal_relevance_search( - self, - query: str, - k: int = 4, - fetch_k: int = 20, - lambda_mult: float = 0.5, - filter: Optional[Union[Callable, Dict[str, Any]]] = None, - **kwargs: Any, - ) -> List[Document]: - """Return docs selected using the maximal marginal relevance. - - Maximal marginal relevance optimizes for similarity to query AND diversity - among selected documents. - - Args: - query: Text to look up documents similar to. - k: Number of Documents to return. Defaults to 4. - fetch_k: Number of Documents to fetch before filtering (if needed) to - pass to MMR algorithm. - lambda_mult: Number between 0 and 1 that determines the degree - of diversity among the results with 0 corresponding - to maximum diversity and 1 to minimum diversity. - Defaults to 0.5. - Returns: - List of Documents selected by maximal marginal relevance. - """ - embedding = self._embed_query(query) - docs = self.max_marginal_relevance_search_by_vector( - embedding, - k=k, - fetch_k=fetch_k, - lambda_mult=lambda_mult, - filter=filter, - **kwargs, - ) - return docs - - async def amax_marginal_relevance_search( - self, - query: str, - k: int = 4, - fetch_k: int = 20, - lambda_mult: float = 0.5, - filter: Optional[Union[Callable, Dict[str, Any]]] = None, - **kwargs: Any, - ) -> List[Document]: - """Return docs selected using the maximal marginal relevance asynchronously. - - Maximal marginal relevance optimizes for similarity to query AND diversity - among selected documents. - - Args: - query: Text to look up documents similar to. - k: Number of Documents to return. Defaults to 4. - fetch_k: Number of Documents to fetch before filtering (if needed) to - pass to MMR algorithm. - lambda_mult: Number between 0 and 1 that determines the degree - of diversity among the results with 0 corresponding - to maximum diversity and 1 to minimum diversity. - Defaults to 0.5. - Returns: - List of Documents selected by maximal marginal relevance. - """ - embedding = await self._aembed_query(query) - docs = await self.amax_marginal_relevance_search_by_vector( - embedding, - k=k, - fetch_k=fetch_k, - lambda_mult=lambda_mult, - filter=filter, - **kwargs, - ) - return docs - - def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]: - """Delete by ID. These are the IDs in the vectorstore. - - Args: - ids: List of ids to delete. - - Returns: - Optional[bool]: True if deletion is successful, - False otherwise, None if not implemented. - """ - if ids is None: - raise ValueError("No ids provided to delete.") - missing_ids = set(ids).difference(self.index_to_docstore_id.values()) - if missing_ids: - raise ValueError( - f"Some specified ids do not exist in the current store. Ids not found: " - f"{missing_ids}" - ) - - reversed_index = {id_: idx for idx, id_ in self.index_to_docstore_id.items()} - index_to_delete = {reversed_index[id_] for id_ in ids} - - self.index.remove_ids(np.fromiter(index_to_delete, dtype=np.int64)) - self.docstore.delete(ids) - - remaining_ids = [ - id_ - for i, id_ in sorted(self.index_to_docstore_id.items()) - if i not in index_to_delete - ] - self.index_to_docstore_id = {i: id_ for i, id_ in enumerate(remaining_ids)} - - return True - - def merge_from(self, target: FAISS) -> None: - """Merge another FAISS object with the current one. - - Add the target FAISS to the current one. - - Args: - target: FAISS object you wish to merge into the current one - - Returns: - None. - """ - if not isinstance(self.docstore, AddableMixin): - raise ValueError("Cannot merge with this type of docstore") - # Numerical index for target docs are incremental on existing ones - starting_len = len(self.index_to_docstore_id) - - # Merge two IndexFlatL2 - self.index.merge_from(target.index) - - # Get id and docs from target FAISS object - full_info = [] - for i, target_id in target.index_to_docstore_id.items(): - doc = target.docstore.search(target_id) - if not isinstance(doc, Document): - raise ValueError("Document should be returned") - full_info.append((starting_len + i, target_id, doc)) - - # Add information to docstore and index_to_docstore_id. - self.docstore.add({_id: doc for _, _id, doc in full_info}) - index_to_id = {index: _id for index, _id, _ in full_info} - self.index_to_docstore_id.update(index_to_id) - - @classmethod - def __from( - cls, - texts: Iterable[str], - embeddings: List[List[float]], - embedding: Embeddings, - metadatas: Optional[Iterable[dict]] = None, - ids: Optional[List[str]] = None, - normalize_L2: bool = False, - distance_strategy: DistanceStrategy = DistanceStrategy.EUCLIDEAN_DISTANCE, - **kwargs: Any, - ) -> FAISS: - faiss = dependable_faiss_import() - if distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT: - index = faiss.IndexFlatIP(len(embeddings[0])) - else: - # Default to L2, currently other metric types not initialized. - index = faiss.IndexFlatL2(len(embeddings[0])) - docstore = kwargs.pop("docstore", InMemoryDocstore()) - index_to_docstore_id = kwargs.pop("index_to_docstore_id", {}) - vecstore = cls( - embedding, - index, - docstore, - index_to_docstore_id, - normalize_L2=normalize_L2, - distance_strategy=distance_strategy, - **kwargs, - ) - vecstore.__add(texts, embeddings, metadatas=metadatas, ids=ids) - return vecstore - - @classmethod - def from_texts( - cls, - texts: List[str], - embedding: Embeddings, - metadatas: Optional[List[dict]] = None, - ids: Optional[List[str]] = None, - **kwargs: Any, - ) -> FAISS: - """Construct FAISS wrapper from raw documents. - - This is a user friendly interface that: - 1. Embeds documents. - 2. Creates an in memory docstore - 3. Initializes the FAISS database - - This is intended to be a quick way to get started. - - Example: - .. code-block:: python - - from langchain_community.vectorstores import FAISS - from langchain_community.embeddings import OpenAIEmbeddings - - embeddings = OpenAIEmbeddings() - faiss = FAISS.from_texts(texts, embeddings) - """ - embeddings = embedding.embed_documents(texts) - return cls.__from( - texts, - embeddings, - embedding, - metadatas=metadatas, - ids=ids, - **kwargs, - ) - - @classmethod - async def afrom_texts( - cls, - texts: list[str], - embedding: Embeddings, - metadatas: Optional[List[dict]] = None, - ids: Optional[List[str]] = None, - **kwargs: Any, - ) -> FAISS: - """Construct FAISS wrapper from raw documents asynchronously. - - This is a user friendly interface that: - 1. Embeds documents. - 2. Creates an in memory docstore - 3. Initializes the FAISS database - - This is intended to be a quick way to get started. - - Example: - .. code-block:: python - - from langchain_community.vectorstores import FAISS - from langchain_community.embeddings import OpenAIEmbeddings - - embeddings = OpenAIEmbeddings() - faiss = await FAISS.afrom_texts(texts, embeddings) - """ - embeddings = await embedding.aembed_documents(texts) - return cls.__from( - texts, - embeddings, - embedding, - metadatas=metadatas, - ids=ids, - **kwargs, - ) - - @classmethod - def from_embeddings( - cls, - text_embeddings: Iterable[Tuple[str, List[float]]], - embedding: Embeddings, - metadatas: Optional[Iterable[dict]] = None, - ids: Optional[List[str]] = None, - **kwargs: Any, - ) -> FAISS: - """Construct FAISS wrapper from raw documents. - - This is a user friendly interface that: - 1. Embeds documents. - 2. Creates an in memory docstore - 3. Initializes the FAISS database - - This is intended to be a quick way to get started. - - Example: - .. code-block:: python - - from langchain_community.vectorstores import FAISS - from langchain_community.embeddings import OpenAIEmbeddings - - embeddings = OpenAIEmbeddings() - text_embeddings = embeddings.embed_documents(texts) - text_embedding_pairs = zip(texts, text_embeddings) - faiss = FAISS.from_embeddings(text_embedding_pairs, embeddings) - """ - texts, embeddings = zip(*text_embeddings) - return cls.__from( - list(texts), - list(embeddings), - embedding, - metadatas=metadatas, - ids=ids, - **kwargs, - ) - - @classmethod - async def afrom_embeddings( - cls, - text_embeddings: Iterable[Tuple[str, List[float]]], - embedding: Embeddings, - metadatas: Optional[Iterable[dict]] = None, - ids: Optional[List[str]] = None, - **kwargs: Any, - ) -> FAISS: - """Construct FAISS wrapper from raw documents asynchronously.""" - return cls.from_embeddings( - text_embeddings, - embedding, - metadatas=metadatas, - ids=ids, - **kwargs, - ) - - def save_local(self, folder_path: str, index_name: str = "index") -> None: - """Save FAISS index, docstore, and index_to_docstore_id to disk. - - Args: - folder_path: folder path to save index, docstore, - and index_to_docstore_id to. - index_name: for saving with a specific index file name - """ - path = Path(folder_path) - path.mkdir(exist_ok=True, parents=True) - - # save index separately since it is not picklable - faiss = dependable_faiss_import() - faiss.write_index(self.index, str(path / f"{index_name}.faiss")) - - # save docstore and index_to_docstore_id - with open(path / f"{index_name}.pkl", "wb") as f: - pickle.dump((self.docstore, self.index_to_docstore_id), f) - - @classmethod - def load_local( - cls, - folder_path: str, - embeddings: Embeddings, - index_name: str = "index", - *, - allow_dangerous_deserialization: bool = False, - **kwargs: Any, - ) -> FAISS: - """Load FAISS index, docstore, and index_to_docstore_id from disk. - - Args: - folder_path: folder path to load index, docstore, - and index_to_docstore_id from. - embeddings: Embeddings to use when generating queries - index_name: for saving with a specific index file name - allow_dangerous_deserialization: whether to allow deserialization - of the data which involves loading a pickle file. - Pickle files can be modified by malicious actors to deliver a - malicious payload that results in execution of - arbitrary code on your machine. - asynchronous: whether to use async version or not - """ - if not allow_dangerous_deserialization: - raise ValueError( - "The de-serialization relies loading a pickle file. " - "Pickle files can be modified to deliver a malicious payload that " - "results in execution of arbitrary code on your machine." - "You will need to set `allow_dangerous_deserialization` to `True` to " - "enable deserialization. If you do this, make sure that you " - "trust the source of the data. For example, if you are loading a " - "file that you created, and no that no one else has modified the file, " - "then this is safe to do. Do not set this to `True` if you are loading " - "a file from an untrusted source (e.g., some random site on the " - "internet.)." - ) - path = Path(folder_path) - # load index separately since it is not picklable - faiss = dependable_faiss_import() - index = faiss.read_index(str(path / f"{index_name}.faiss")) - - # load docstore and index_to_docstore_id - with open(path / f"{index_name}.pkl", "rb") as f: - docstore, index_to_docstore_id = pickle.load(f) - return cls(embeddings, index, docstore, index_to_docstore_id, **kwargs) - - def serialize_to_bytes(self) -> bytes: - """Serialize FAISS index, docstore, and index_to_docstore_id to bytes.""" - return pickle.dumps((self.index, self.docstore, self.index_to_docstore_id)) - - @classmethod - def deserialize_from_bytes( - cls, - serialized: bytes, - embeddings: Embeddings, - **kwargs: Any, - ) -> FAISS: - """Deserialize FAISS index, docstore, and index_to_docstore_id from bytes.""" - index, docstore, index_to_docstore_id = pickle.loads(serialized) - return cls(embeddings, index, docstore, index_to_docstore_id, **kwargs) - - def _select_relevance_score_fn(self) -> Callable[[float], float]: - """ - The 'correct' relevance function - may differ depending on a few things, including: - - the distance / similarity metric used by the VectorStore - - the scale of your embeddings (OpenAI's are unit normed. Many others are not!) - - embedding dimensionality - - etc. - """ - if self.override_relevance_score_fn is not None: - return self.override_relevance_score_fn - - # Default strategy is to rely on distance strategy provided in - # vectorstore constructor - if self.distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT: - return self._max_inner_product_relevance_score_fn - elif self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE: - # Default behavior is to use euclidean distance relevancy - return self._euclidean_relevance_score_fn - elif self.distance_strategy == DistanceStrategy.COSINE: - return self._cosine_relevance_score_fn - else: - raise ValueError( - "Unknown distance strategy, must be cosine, max_inner_product," - " or euclidean" - ) - - def _similarity_search_with_relevance_scores( - self, - query: str, - k: int = 4, - filter: Optional[Union[Callable, Dict[str, Any]]] = None, - fetch_k: int = 20, - **kwargs: Any, - ) -> List[Tuple[Document, float]]: - """Return docs and their similarity scores on a scale from 0 to 1.""" - # Pop score threshold so that only relevancy scores, not raw scores, are - # filtered. - relevance_score_fn = self._select_relevance_score_fn() - if relevance_score_fn is None: - raise ValueError( - "normalize_score_fn must be provided to" - " FAISS constructor to normalize scores" - ) - docs_and_scores = self.similarity_search_with_score( - query, - k=k, - filter=filter, - fetch_k=fetch_k, - **kwargs, - ) - docs_and_rel_scores = [ - (doc, relevance_score_fn(score)) for doc, score in docs_and_scores - ] - return docs_and_rel_scores - - async def _asimilarity_search_with_relevance_scores( - self, - query: str, - k: int = 4, - filter: Optional[Union[Callable, Dict[str, Any]]] = None, - fetch_k: int = 20, - **kwargs: Any, - ) -> List[Tuple[Document, float]]: - """Return docs and their similarity scores on a scale from 0 to 1.""" - # Pop score threshold so that only relevancy scores, not raw scores, are - # filtered. - relevance_score_fn = self._select_relevance_score_fn() - if relevance_score_fn is None: - raise ValueError( - "normalize_score_fn must be provided to" - " FAISS constructor to normalize scores" - ) - docs_and_scores = await self.asimilarity_search_with_score( - query, - k=k, - filter=filter, - fetch_k=fetch_k, - **kwargs, - ) - docs_and_rel_scores = [ - (doc, relevance_score_fn(score)) for doc, score in docs_and_scores - ] - return docs_and_rel_scores - - @staticmethod - def _create_filter_func( - filter: Optional[Union[Callable, Dict[str, Any]]], - ) -> Callable[[Dict[str, Any]], bool]: - """ - Create a filter function based on the provided filter. - - Args: - filter: A callable or a dictionary representing the filter - conditions for documents. - - Returns: - Callable[[Dict[str, Any]], bool]: A function that takes Document's metadata - and returns True if it satisfies the filter conditions, otherwise False. - """ - if callable(filter): - return filter - - if not isinstance(filter, dict): - raise ValueError( - f"filter must be a dict of metadata or a callable, not {type(filter)}" - ) - - def filter_func(metadata: Dict[str, Any]) -> bool: - return all( - metadata.get(key) in value - if isinstance(value, list) - else metadata.get(key) == value - for key, value in filter.items() # type: ignore - ) - - return filter_func diff --git a/notebooks/experiment.ipynb b/notebooks/experiment.ipynb new file mode 100644 index 0000000..03121cc --- /dev/null +++ b/notebooks/experiment.ipynb @@ -0,0 +1,248 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "75c1d280-98f1-4dfc-9886-670dcce85da6", + "metadata": {}, + "source": [ + "## Sample Experiments" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "8af047ea-691c-49a7-a685-c669ba615621", + "metadata": {}, + "outputs": [], + "source": [ + "%load_ext autoreload\n", + "%autoreload 2" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "c8ccd147-ae07-40c1-872b-4a0d1bda44f4", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/opt/anaconda3/envs/ragas/lib/python3.10/site-packages/pydantic/_internal/_fields.py:151: UserWarning: Field \"model_prompt\" has conflict with protected namespace \"model_\".\n", + "\n", + "You may be able to resolve this warning by setting `model_config['protected_namespaces'] = ()`.\n", + " warnings.warn(\n" + ] + } + ], + "source": [ + "from ragbench import RAGBuilder\n", + "from ragbench import ComponentConfig, Config, DirectoryConfig, RetrieverConfig" + ] + }, + { + "cell_type": "markdown", + "id": "06bc57cd-a1f8-4f73-b830-83b296353095", + "metadata": {}, + "source": [ + "## Build RAG" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "f93926a2-49ea-4ab9-acb6-9830dd1ae5a6", + "metadata": {}, + "outputs": [], + "source": [ + "directory = DirectoryConfig(name=\"../datasets/Sample_Docs_Markdown/\")" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "b3eba3bc-9213-4f3e-bd18-367dc656cbf3", + "metadata": {}, + "outputs": [], + "source": [ + "config = Config(directory=directory)" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "18749ad2-877c-489d-a646-345d0ece98a2", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "100%|███████████████████████████████████████████████| 1/1 [00:05<00:00, 5.93s/it]\n" + ] + } + ], + "source": [ + "rag = RAGBuilder(config)" + ] + }, + { + "cell_type": "markdown", + "id": "dcbc696e-ff88-4109-9b7c-a52f193921dc", + "metadata": {}, + "source": [ + "### Evaluate" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "dc2747ad-bc6c-4e6a-bf41-23ec6d526146", + "metadata": {}, + "outputs": [], + "source": [ + "from ragas.dataset_schema import SingleTurnSample, EvaluationDataset" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "87834cc1-e93b-4ba0-9b55-5261cb33c5e7", + "metadata": {}, + "outputs": [], + "source": [ + "sample = SingleTurnSample(user_input=\"What is an MoE?\", reference=\"MoE is something\")\n", + "dataset = EvaluationDataset(samples=[sample,sample])" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "97edcea2-be05-476e-80f5-588c75d59241", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
user_inputreference
0What is an MoE?MoE is something
1What is an MoE?MoE is something
\n", + "
" + ], + "text/plain": [ + " user_input reference\n", + "0 What is an MoE? MoE is something\n", + "1 What is an MoE? MoE is something" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "dataset.to_pandas()" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "bc7f139d-86c4-4b8f-92b7-465377152f29", + "metadata": {}, + "outputs": [], + "source": [ + "from ragas.metrics import ContextPrecision, FactualCorrectness\n", + "from ragas.llms import llm_factory\n", + "llm = llm_factory()\n", + "metrics = [FactualCorrectness()]" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "bba7061f-205d-4e6f-b880-3b2fe781990f", + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "60b39fbcb1a747e794fbe9456d3ca389", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "Evaluating: 0%| | 0/2 [00:00=3.6" +license = {text = "MIT"} +authors = [ + {name = "Ikka", email = "shahul@explodinggradients.com"} +] +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", +] +dependencies = [ + "numpy>=1.19.0", + "requests>=2.25.0" +] + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.black] +line-length = 88 +target-version = ['py39'] +exclude = ''' +/( + \.git + | \.hg + | \.mypy_cache + | \.venv + | _build + | build + | dist + | pyproject\.toml # Exclude pyproject.toml + | .*\.pyc # Exclude compiled Python files + # Add other files or patterns to exclude +)/ +''' \ No newline at end of file diff --git a/rag_utils.py b/rag_utils.py deleted file mode 100755 index 93234f4..0000000 --- a/rag_utils.py +++ /dev/null @@ -1,361 +0,0 @@ -import torch -from datasets import load_dataset -from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline -# from langchain.docstore.document import Document -from langchain.text_splitter import RecursiveCharacterTextSplitter -# from langchain.vectorstores import FAISS # deprecated -# from langchain_community.vectorstores import FAISS # use custom FAISS lib -from customized_faiss import FAISS -from langchain_core.documents import Document -# from langchain_community.embeddings import HuggingFaceEmbeddings # deprecated -from langchain_huggingface import HuggingFaceEmbeddings -from langchain_community.vectorstores.utils import DistanceStrategy -from tqdm import tqdm -from pathlib import Path -from datetime import datetime -from typing import List, Dict -import dill -from ragatouille import RAGPretrainedModel - - - -def cache_object(cache_dir, obj, name, verbose=True): - if cache_dir is None: - return False - - cache_path = f'{cache_dir}/cached_{name}.pkl' - with open(cache_path, 'wb') as f: - if verbose: print(f'- Caching {name} @ "{cache_path}"') - dill.dump(obj, f) - return True - - - -class VectorBase(): - def __init__(self, cache_dir: str, verbose=True): - self.cache_dir = cache_dir - if cache_dir is not None: - Path(self.cache_dir).mkdir(parents=True, exist_ok=True) - self.verbose = verbose - - - def cache_vector_db(self): - cache_path = f'{self.cache_dir}/cached_vector_db' - print(f'- Caching Vector DB at "{cache_path}"') - self.vector_db.save_local(cache_path) - - - # Load Cached Objects - def load_obj_from_cache(self, name): - cache_path = f'{self.cache_dir}/cached_{name}.pkl' - if Path(cache_path).is_file(): - with open(cache_path, 'rb') as f: - print(f'- Loading {name} from Cache: "{cache_path}"') - loaded_obj = dill.load(f) - match name: - case 'docs': - self.ls_docs = loaded_obj - case 'chunks': - self.ls_chunks = loaded_obj - case _: - raise NotImplementedError() - return True - return False - - - def load_vector_db_from_cache(self): - cache_path = f'{self.cache_dir}/cached_vector_db' - if Path(cache_path).is_dir(): - print(f'- Loading Vector DB from Cache: "{cache_path}"') - self.vector_db = FAISS.load_local( - cache_path, - self.embedding_model, - distance_strategy=DistanceStrategy.COSINE, - allow_dangerous_deserialization=True, - ) - return True - return False - - - # Load Docs Methods - def load_docs_from_hf(self, dataset_id: str, subset: str, split: str, column: str): - self.hf_dataset_id = dataset_id - self.hf_dataset_subset = subset - self.hf_dataset_split = split - self.hf_dataset_column = column - - print(f'- Loading Corpus Dataset from HF: {self.hf_dataset_id} ({self.hf_dataset_subset})') - ds = load_dataset(dataset_id, subset, split=split) - self.ls_docs = [] - for row in tqdm(ds, desc='- Converting to LangChain Document'): - row = row.copy() - content = row[column] - del row[column] - metadata = row | {'hf_ds_id': dataset_id, 'hf_ds_subset': subset, 'hf_ds_split': split, 'hf_ds_col': column} - self.ls_docs.append(Document(page_content=content, metadata=metadata)) - - cache_object(self.cache_dir, self.ls_docs, 'docs') - - - # Load Embedding Model Methods - def load_embedding_model_from_hf(self, model_id: str, device: str ='cuda:0', max_seq_len: int =None): - self.hf_embedding_model_id = model_id - - print(f'- Loading Embedding Model & Tokenizer: {self.hf_embedding_model_id}') - self.embedding_model = HuggingFaceEmbeddings( - model_name=model_id, - multi_process=True, # multi GPU - model_kwargs={'device': device}, - encode_kwargs={"normalize_embeddings": True}, # set True for cosine similarity - ) - - self.embedding_tokenizer = AutoTokenizer.from_pretrained(model_id) - - if max_seq_len is None: - self.chunk_size = self.embedding_model.client.max_seq_length - else: - self.chunk_size = max_seq_len - print(f'- Chunk Size (#Tokens): {self.chunk_size}') # todo: make this a parameter too - - - def split_docs(self, percent_overlap: float =0.1): - self.ls_text_seps = ["\n#{1,6} ", "\n\n", "\n", " ", ""] - - self.text_splitter = RecursiveCharacterTextSplitter.from_huggingface_tokenizer( - self.embedding_tokenizer, - chunk_size=self.chunk_size, - chunk_overlap=int(self.chunk_size * percent_overlap), - add_start_index=True, - strip_whitespace=True, - separators=self.ls_text_seps, - ) - - ls_chunks_temp= [] - for doc in tqdm(self.ls_docs, desc='- Splitting Documents to Chunks'): - ls_chunks_temp += self.text_splitter.split_documents([doc]) - - set_unique_texts = set() - ls_unique_chunks = [] - for chunk in tqdm(ls_chunks_temp, desc='- Removing Duplicated Chunks'): - if chunk.page_content not in set_unique_texts: - set_unique_texts.add(chunk.page_content) - ls_unique_chunks.append(chunk) - - self.ls_chunks = ls_unique_chunks - cache_object(self.cache_dir, self.ls_chunks, 'chunks') - - print(f'- {len(self.ls_docs):,} Documents splitted into {len(self.ls_chunks):,} Chunks') - - - def prepare_vector_db(self): - def get_cur_time(): - return datetime.now().isoformat() - - time_start = get_cur_time() - print(f'- Vector DB: Start Embedding at {time_start}') - - self.vector_db = FAISS.from_documents( - self.ls_chunks, - self.embedding_model, - distance_strategy=DistanceStrategy.COSINE, - ) - - time_end = get_cur_time() - print(f'- Vector DB: Finished Embedding at {time_end}') - - if self.cache_dir is not None: - self.cache_vector_db() - - - @staticmethod - def from_yaml_config(config: dict): - # The order of the operations are important. - vec = VectorBase(config['vb_cache_dir'], config['vb_verbose']) - vec.config = config - - if not vec.load_obj_from_cache('docs'): - vec.load_docs_from_hf(config['docs_dataset_id'], config['docs_dataset_subset'], config['docs_dataset_split'], config['docs_dataset_column']) - - vec.load_embedding_model_from_hf(config['embedding_model_id'], config['embedding_model_device'], config['chunk_size']) - - if not vec.load_obj_from_cache('chunks'): - vec.split_docs(config['split_overlap']) - - if not vec.load_vector_db_from_cache(): - vec.prepare_vector_db() - - return vec - - - -class Retriever(): - def __init__(self, vb: VectorBase, cache_dir: str, num_retrievals: int =10, verbose: bool =True): - self.vb = vb - self.cache_dir = cache_dir - Path(self.cache_dir).mkdir(parents=True, exist_ok=True) - self.num_retrievals = num_retrievals - self.verbose = verbose - - - def load_queries_from_cache(self): - cache_path = f'{self.cache_dir}/cached_queries.pkl' - if Path(cache_path).is_file(): - with open(cache_path, 'rb') as f: - print(f'- Loading Queries from Cache: "{cache_path}"') - self.ls_queries = dill.load(f) - return True - return False - - - def load_queries_from_hf(self, dataset_id: str, subset: str, split: str, column: str): - self.hf_dataset_id = dataset_id - self.hf_dataset_subset = subset - self.hf_dataset_split = split - self.hf_dataset_column = column - - print(f'- Loading Queries Dataset from HF: {self.hf_dataset_id} ({self.hf_dataset_subset})') - ds = load_dataset(dataset_id, subset, split=split) - self.ls_queries = ds[column] - - cache_object(self.cache_dir, self.ls_queries, 'queries') - - - def load_rets_from_cache(self): - cache_path = f'{self.cache_dir}/cached_rets.pkl' - if Path(cache_path).is_file(): - with open(cache_path, 'rb') as f: - print(f'- Loading Rets from Cache: "{cache_path}"') - self.ls_rets = dill.load(f) - return True - return False - - - def retrieve(self) -> Dict[str, List[Document]]: - ls_ls_docs = self.vb.vector_db.batch_similarity_search(self.ls_queries, self.num_retrievals, self.verbose) - self.ls_rets = {k:v for k, v in zip(self.ls_queries, ls_ls_docs)} - cache_object(self.cache_dir, self.ls_rets, 'rets') - - - @staticmethod - def from_yaml_config(vb: VectorBase, config: dict): - ret = Retriever(vb, config['rets_cache_dir'], config['num_retrievals'], config['rets_verbose']) - # The order of the operations are important. - - if not ret.load_queries_from_cache(): - ret.load_queries_from_hf(config['queries_dataset_id'], config['queries_dataset_subset'], config['queries_dataset_split'], config['queries_dataset_column']) - - if not ret.load_rets_from_cache(): - ret.retrieve() - - return ret - - - -class Generator(): - def __init__(self, ret: Retriever, cache_dir: str, num_selections: int, system_prompt: str, context_prompt: str): - self.ret = ret - self.cache_dir = cache_dir - self.num_selections = num_selections - self.system_prompt = system_prompt - self.context_prompt = context_prompt - - - # Load Cached Objects - def load_obj_from_cache(self, name): - cache_path = f'{self.cache_dir}/cached_{name}.pkl' - if Path(cache_path).is_file(): - with open(cache_path, 'rb') as f: - print(f'- Loading {name} from Cache: "{cache_path}"') - loaded_obj = dill.load(f) - match name: - case 'reranked_rets': - self.ls_reranked_rets = loaded_obj - case 'augmented_generations': - self.ls_augmented_generations = loaded_obj - case _: - raise NotImplementedError() - return True - return False - - - def load_generator_model_from_hf(self, model_id, generation_config, device='cuda:0', torch_dtype=torch.bfloat16, trust_remote_code=True): - self.hf_generator_model_id = model_id - self.generation_config = generation_config - print(f'- Loading Generator Model & Tokenizer: "{self.hf_generator_model_id}"') - tokenizer = AutoTokenizer.from_pretrained(model_id) - model = AutoModelForCausalLM.from_pretrained(model_id, device_map=device, torch_dtype=torch_dtype, trust_remote_code=trust_remote_code) - self.pipe = pipeline( - "text-generation", model=model, tokenizer=tokenizer - ) - # # this is bug: https://github.com/langchain-ai/langchain/issues/22776#issue-2346538588 - # self.generator_model = HuggingFacePipeline(pipeline=pipe) - # self.generator_model_output_parser = StrOutputParser() - - - def load_reranker(self, model_id): - self.reranker = RAGPretrainedModel.from_pretrained(model_id) - - - def augmented_generate(self, query: str, context: str) -> str: - messages = [ - {'role': 'system', 'content': self.system_prompt}, - {'role': 'user', 'content': self.context_prompt.format(CONTEXT=context, QUERY=query)}, - ] - - # generation_args = { - # "max_new_tokens": 8, - # "return_full_text": False, - # "temperature": 0.0, - # "do_sample": False, - # } - - output = self.pipe(messages, **self.generation_config) - return output[0]['generated_text'] - - - def rerank_rets(self): - ls_rets = self.ret.ls_rets - self.ls_reranked_rets = {} - - for query, rets in tqdm(ls_rets.items(), total=len(ls_rets), desc='Reranking Retrievals'): - if self.reranker is None: - self.ls_reranked_rets[query] = [doc.page_content for doc in rets] - else: - relevant_docs_reranked = self.reranker.rerank(query, - [doc.page_content for doc in rets], - k=self.num_selections) - self.ls_reranked_rets[query] = [doc['content'] for doc in relevant_docs_reranked] - - cache_object(self.cache_dir, self.ls_reranked_rets, 'reranked_rets') - - - def generate_responses(self): - self.ls_augmented_generations = {} - - for query, relevant_docs in tqdm(self.ls_reranked_rets.items(), total=len(self.ls_reranked_rets)): - docs_prompt = [f"Document {str(i)}:::\n{doc}" for i, doc in enumerate(relevant_docs)] - prompt_context = '\nExtracted documents:\n' + '\n'.join(docs_prompt) - - answer = self.augmented_generate(query, prompt_context) - self.ls_augmented_generations[query] = answer - - - cache_object(self.cache_dir, self.ls_augmented_generations, 'augmented_generations') - - - @staticmethod - def from_yaml_config(ret: Retriever, config: dict): - gen = Generator(ret, config['gens_cache_dir'], config['num_selections'], config['system_prompt'], config['context_prompt']) - gen.load_generator_model_from_hf(config['generator_model_id'], config['generator_model_config'], config['generator_model_device'], config['generator_model_torch_dtype'], config['generator_model_trust_remote_code']) - gen.load_reranker(config['reranker_model_id']) - - if not gen.load_obj_from_cache('reranked_rets'): - gen.rerank_rets() - - if not gen.load_obj_from_cache('augmented_generations'): - gen.generate_responses() - - return gen - - - diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..7f4c315 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +langchain_community +black +git+https://github.com/explodinggradients/ragas.git \ No newline at end of file diff --git a/results/default_experiment/config.json b/results/default_experiment/config.json new file mode 100644 index 0000000..a73c4f2 --- /dev/null +++ b/results/default_experiment/config.json @@ -0,0 +1,36 @@ +{ + "directory": { + "name": "../datasets/Sample_Docs_Markdown/", + "params": { + "sample_size": 1, + "show_progress": true + } + }, + "experiment_name": "default_experiment", + "text_splitter": { + "class_path": "langchain_text_splitters.RecursiveCharacterTextSplitter", + "params": { + "chunk_size": 512, + "chunk_overlap": 100 + } + }, + "embedding_model": { + "class_path": "langchain_openai.OpenAIEmbeddings", + "params": {} + }, + "llm": { + "class_path": "langchain_openai.ChatOpenAI", + "params": { + "model": "gpt-4o" + } + }, + "retriever": { + "type": "vanilla", + "params": { + "search_kwargs": { + "k": 2 + } + } + }, + "model_prompt": "Answer the user query based on the context" +} \ No newline at end of file diff --git a/results/default_experiment/results.csv b/results/default_experiment/results.csv new file mode 100644 index 0000000..e2a9350 --- /dev/null +++ b/results/default_experiment/results.csv @@ -0,0 +1,5 @@ +,user_input,retrieved_contexts,response,reference,factual_correctness +0,What is an MoE?,"['Mixture of Experts (MoE), in the context of modern deep learning architectures, was proven effective in Shazeer et al. (2017). That work added an MoE layer which was stacked between LSTM (Hochreiter and Schmidhuber, 1997) layers, and tokens were separately routed to combinations of experts. This resulted in state-of-the-art results in language modeling and machine translation benchmarks. The MoE layer was reintroduced into the Transformer architecture by the Mesh Tensorflow library (Shazeer et al., 2018)' + 'In deep learning, models typically reuse the same parameters for all inputs. Mixture of Experts (MoE) models defy this and instead select different parameters for each incoming example. The result is a sparsely-activated model—with an outrageous number of parameters—but a constant computational cost. However, despite several notable successes of MoE, widespread adoption has been hindered by complexity, communication costs, and training instability. We address these with the introduction of the Switch']","Mixture of Experts (MoE) is a model in modern deep learning architectures that selects different parameters for each incoming example, resulting in a sparsely-activated model with a large number of parameters but a constant computational cost. It was proven effective in Shazeer et al. (2017) and reintroduced into the Transformer architecture by the Mesh Tensorflow library. Despite its successes, widespread adoption has been hindered by complexity, communication costs, and training instability.",MoE is something,0.0 +1,What is an MoE?,"['Mixture of Experts (MoE), in the context of modern deep learning architectures, was proven effective in Shazeer et al. (2017). That work added an MoE layer which was stacked between LSTM (Hochreiter and Schmidhuber, 1997) layers, and tokens were separately routed to combinations of experts. This resulted in state-of-the-art results in language modeling and machine translation benchmarks. The MoE layer was reintroduced into the Transformer architecture by the Mesh Tensorflow library (Shazeer et al., 2018)' + 'In deep learning, models typically reuse the same parameters for all inputs. Mixture of Experts (MoE) models defy this and instead select different parameters for each incoming example. The result is a sparsely-activated model—with an outrageous number of parameters—but a constant computational cost. However, despite several notable successes of MoE, widespread adoption has been hindered by complexity, communication costs, and training instability. We address these with the introduction of the Switch']","Mixture of Experts (MoE) is a model in modern deep learning architectures that selects different parameters for each incoming example, resulting in a sparsely-activated model with a large number of parameters but a constant computational cost. It was proven effective in Shazeer et al. (2017) and reintroduced into the Transformer architecture by the Mesh Tensorflow library. Despite its successes, widespread adoption has been hindered by complexity, communication costs, and training instability.",MoE is something,0.0 diff --git a/results/readme.md b/results/readme.md new file mode 100644 index 0000000..c264bab --- /dev/null +++ b/results/readme.md @@ -0,0 +1 @@ +Log important results here \ No newline at end of file diff --git a/ret_evals.py b/ret_evals.py deleted file mode 100755 index 760a512..0000000 --- a/ret_evals.py +++ /dev/null @@ -1,104 +0,0 @@ -from tqdm import tqdm -import argparse, json - - -def mrr_at(k, ls_rets, ls_golds): - ls_mrr = [] - - for i, rets, golds in (pbar := tqdm(zip(range(len(ls_golds)), ls_rets, ls_golds))): - first_relevant_rank = None - - golds_stripped = [''.join(gold.split()) for gold in golds] - rets_stripped = [''.join(ret.split()) for ret in rets] - - for r, ret_item in enumerate(rets_stripped): - if any(gold_item in ret_item for gold_item in golds_stripped): - if r < k: - if first_relevant_rank is None: - first_relevant_rank = r + 1 - - ls_mrr.append(1 / first_relevant_rank if first_relevant_rank else 0) - pbar.set_description(f"MRR@{k} {sum(ls_mrr) / len(ls_golds):.4f}") - - return sum(ls_mrr) / len(ls_golds) - - -def map_at(k, ls_rets, ls_golds): - ls_apk = [] - for i, rets, golds in (pbar := tqdm(zip(range(len(ls_golds)), ls_rets, ls_golds))): - ap_sum = 0 - found_golds = [] - - golds_stripped = [''.join(gold.split()) for gold in golds] - rets_stripped = [''.join(ret.split()) for ret in rets] - - for r, ret_item in enumerate(rets_stripped): - if any(gold_item in ret_item for gold_item in golds_stripped): - if r < k: - # Compute precision at this rank for this query - count = 0 - for gold_item in golds_stripped: - if gold_item in ret_item and not gold_item in found_golds: - count = count + 1 - found_golds.append(gold_item) - p_at_r = count / (r+1) - ap_sum += p_at_r - - # Calculate metrics for this query - ls_apk.append(ap_sum / min(len(golds_stripped), k)) - pbar.set_description(f"MAP@{k} {sum(ls_apk) / len(ls_golds):.4f}") - - return sum(ls_apk) / len(ls_golds) - - -def hits_at(k, ls_rets, ls_golds): - hits = 0 - for i, rets, golds in (pbar := tqdm(zip(range(len(ls_golds)), ls_rets, ls_golds))): - is_hit = False - golds_stripped = [''.join(gold.split()) for gold in golds] - rets_stripped = [''.join(ret.split()) for ret in rets] - - for ret_item in rets_stripped[:k]: - if any(gold_item in ret_item for gold_item in golds_stripped): - is_hit = True - - hits += int(is_hit) - pbar.set_description(f"Hits@{k} {hits/(i+1):.4f}") - - return hits / len(ls_golds) - - - - - -def main_eval(file_name): - print(f'For file: {file_name}') - with open(file_name, 'r') as file: - data = json.load(file) - retrieved_lists = [] - gold_lists = [] - - for d in data: - if d['question_type'] == 'null_query': - continue - retrieved_lists.append([m['text'] for m in d['retrieval_list']]) - gold_lists.append([m['fact'] for m in d['gold_list']]) - - # Calculate metrics - hit10 = hits_at(10, retrieved_lists, gold_lists) - hit4 = hits_at(4, retrieved_lists, gold_lists) - map10 = map_at(10, retrieved_lists, gold_lists) - mrr10 = mrr_at(10, retrieved_lists, gold_lists) - - print(hit10) - print(hit4) - print(map10) - print(mrr10) - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('--file', type=str, required=True, help='File Name') - args = parser.parse_args() - - main_eval(args.file) - \ No newline at end of file diff --git a/run_rag.py b/run_rag.py deleted file mode 100755 index fdb312f..0000000 --- a/run_rag.py +++ /dev/null @@ -1,38 +0,0 @@ -# todo: ship the conda env along with the codes -# todo: time each part of the pipeline to find out potential places for optimization - -from rag_utils import VectorBase, Retriever, Generator -import argparse, yaml, json, torch - - -DTYPE_2_TORCH_DTYPE = { - 'bf16': torch.bfloat16, - 'fp16': torch.float16, - 'fp32': torch.float32 -} - - -if __name__ == '__main__': - # Parse command-line arguments - parser = argparse.ArgumentParser(description="This script accepts two command-line arguments.") - parser.add_argument('--yaml-config', dest='yaml_config', required=True) - parser.add_argument('--mode', dest='mode', type=str) - args = parser.parse_args() - - # Load configuration from a YAML file - with open(args.yaml_config, 'r') as yaml_file: - config = yaml.load(yaml_file, Loader=yaml.FullLoader) - config['generator_model_torch_dtype'] = DTYPE_2_TORCH_DTYPE[config['generator_model_torch_dtype']] - - if 'v' in args.mode: - vb = VectorBase.from_yaml_config(config) - - if 'r' in args.mode: - ret = Retriever.from_yaml_config(vb, config) - # print(ret.ls_rets[list(ret.ls_rets.keys())[0]][0]) - - if 'g' in args.mode: - gen = Generator.from_yaml_config(ret, config) - - - diff --git a/sample.yaml b/sample.yaml deleted file mode 100755 index 2fbdf7b..0000000 --- a/sample.yaml +++ /dev/null @@ -1,68 +0,0 @@ ---- # TODO: -# - Model Generation Config: Temperature, Sampling and etc... - -# VectorBase -#/home/erfan/Desktop/actives/card/src2 -vb_cache_dir: ./caches/sample_experiment -vb_verbose: true -split_overlap: 0.1 -## Documents Source -docs_from_hf: true -docs_dataset_id: yixuantt/MultiHopRAG -docs_dataset_subset: corpus -docs_dataset_split: train -docs_dataset_column: body -## Embedding Model Source -embedding_model_id: thenlper/gte-small #BAAI/bge-large-en-v1.5 #BAAI/llm-embedder -embedding_model_device: cuda -chunk_size: 256 # <= model_max_seq_len - - - -# Retriever -rets_cache_dir: ./caches/sample_experiment -rets_verbose: true -num_retrievals: 10 # line 67: top_k -## Queries Source -queries_from_hf: true -queries_dataset_id: yixuantt/MultiHopRAG -queries_dataset_subset: MultiHopRAG -queries_dataset_split: train -queries_dataset_column: query - - - -# Generator -gens_cache_dir: ./caches/sample_experiment -gens_verbose: true -num_selections: 4 # line 139: top_n = top_k -## Generator Model Source -generator_model_id: microsoft/Phi-3.5-mini-instruct -generator_model_trust_remote_code: True -generator_model_device: cuda -generator_model_torch_dtype: bf16 -generator_model_config: - - max_new_tokens: 8 - - return_full_text: False - - temperature: 0 -## Reranker Model -reranker_model_id: colbert-ir/colbertv2.0 -## RAG Prompts -system_prompt: | - Using the information contained in the context, give a comprehensive answer to the question. - Respond only to the question asked, response should be concise and relevant to the question. - If the answer cannot be deduced from the context, do not generate any response on your own. - Place your response only in the following JSON and do not generate anything else: - {{ - "found_the_answer": , - "actual_response": , - "id_of_relevant_documents": , - }} - -context_prompt: | - Context: - {CONTEXT} - --- - Now here is the question you need to answer. - {QUERY} - diff --git a/src/ragbench/__init__.py b/src/ragbench/__init__.py new file mode 100644 index 0000000..ef50ac6 --- /dev/null +++ b/src/ragbench/__init__.py @@ -0,0 +1,10 @@ +from ragbench.rag import RAGBuilder +from ragbench.config import ComponentConfig, Config, DirectoryConfig, RetrieverConfig + +__all__ = [ + "RAGBuilder", + "Config", + "ComponentConfig", + "DirectoryConfig", + "RetrieverConfig", +] diff --git a/src/ragbench/config.py b/src/ragbench/config.py new file mode 100644 index 0000000..736e199 --- /dev/null +++ b/src/ragbench/config.py @@ -0,0 +1,104 @@ +from pydantic import BaseModel, field_validator +from typing import Dict, Any, Optional, Literal, List +import json + +import os +import importlib +import inspect + + +class ComponentConfig(BaseModel): + class_path: str + params: Dict[str, Any] = {} + + @field_validator("class_path") + @classmethod + def validate_class_path(cls, v): + """ + Validates that the class_path can be imported and the class exists. + """ + try: + module_path, class_name = v.rsplit(".", 1) + module = importlib.import_module(module_path) + getattr(module, class_name) + except (ImportError, AttributeError, ValueError) as e: + raise ValueError(f"Cannot import '{v}': {e}") + return v + + @field_validator("params") + @classmethod + def validate_params(cls, v, info): + """ + Validates that the params match the class's __init__ signature. + """ + class_path = info.data.get("class_path") + if class_path: + try: + module_path, class_name = class_path.rsplit(".", 1) + module = importlib.import_module(module_path) + target_class = getattr(module, class_name) + sig = inspect.signature(target_class.__init__) + valid_params = sig.parameters.keys() + if "kwargs" in valid_params: + return v + for param in v: + if param not in valid_params: + raise ValueError( + f"Invalid parameter '{param}' for class '{class_path}'" + ) + except Exception as e: + raise ValueError(f"Error validating params for '{class_path}': {e}") + return v + + +class RetrieverConfig(BaseModel): + type: Literal["vanilla", "multi_query"] = "vanilla" + params: Dict[str, Any] = {"search_kwargs": {"k": 2}} + + +class DirectoryConfig(BaseModel): + name: str + params: Dict[str, Any] = {"sample_size": 1, "show_progress": True} + + +class Config(BaseModel): + directory: DirectoryConfig + experiment_name: str = "default_experiment" + text_splitter: ComponentConfig = ComponentConfig( + class_path="langchain_text_splitters.RecursiveCharacterTextSplitter", + params={"chunk_size": 512, "chunk_overlap": 100}, + ) + embedding_model: ComponentConfig = ComponentConfig( + class_path="langchain_openai.OpenAIEmbeddings", params={} + ) + llm: ComponentConfig = ComponentConfig( + class_path="langchain_openai.ChatOpenAI", params={"model": "gpt-4o"} + ) + retriever: RetrieverConfig = RetrieverConfig() + model_prompt: str = "Answer the user query based on the context" + + @field_validator("directory") + @classmethod + def validate_directory(cls, v): + """ + Validates that the directory exists. + """ + if not os.path.exists(v.name): + raise ValueError(f"Directory '{v.name}' does not exist.") + return v + + @classmethod + def load(cls, config_name_or_path: str): + config = json.load(open(config_name_or_path)) + return cls(**config) + + def save(self, path: str): + with open(path, "w") as f: + json.dump(self.model_dump(), f, indent=4) + + +def instantiate_component(component_config: ComponentConfig): + module_path, class_name = component_config.class_path.rsplit(".", 1) + module = importlib.import_module(module_path) + cls = getattr(module, class_name) + return cls(**component_config.params) diff --git a/src/ragbench/prompt.py b/src/ragbench/prompt.py new file mode 100644 index 0000000..950cc35 --- /dev/null +++ b/src/ragbench/prompt.py @@ -0,0 +1,26 @@ +from pydantic import BaseModel, Field +from ragas.prompt.pydantic_prompt import PydanticPrompt + + +class RAGInput(BaseModel): + query: str = Field(description="User query") + context: str = Field(description="Context to search for answers") + + +class RAGOutput(BaseModel): + response: str = Field(description="Response to the user query") + + +class RAGPrompt(PydanticPrompt[RAGInput, RAGOutput]): + instructions = "Answer the user query based on the context" + input_model = RAGInput + output_model = RAGOutput + examples = [ + ( + RAGInput( + query="What is the capital of France?", + context="Paris is the capital of France.", + ), + RAGOutput(response="Paris"), + ) + ] diff --git a/src/ragbench/rag.py b/src/ragbench/rag.py new file mode 100644 index 0000000..86720b9 --- /dev/null +++ b/src/ragbench/rag.py @@ -0,0 +1,177 @@ +import os +from langchain_text_splitters.base import TextSplitter +from langchain_core.embeddings import Embeddings +from langchain_core.language_models import BaseLanguageModel +from langchain_community.document_loaders import DirectoryLoader +from langchain_community.vectorstores import FAISS +import logging + + +from ragbench.config import Config, instantiate_component +from ragbench.prompt import RAGInput, RAGPrompt +from ragas.llms import LangchainLLMWrapper +from ragas.dataset_schema import EvaluationDataset +from ragas import evaluate +from ragas.metrics.base import Metric + + +from typing import List + +VECTOR_STORE_PATH = "../vector_stores" +RESULTS_PATH = "../results" + +logger = logging.getLogger(__name__) + + +class RAGBuilder: + def __init__( + self, + config: Config, + ) -> None: + self.config = config + llm = instantiate_component(config.llm) if config.llm else None + assert isinstance(llm, BaseLanguageModel) + text_splitter = ( + instantiate_component(config.text_splitter) + if config.text_splitter + else None + ) + assert isinstance(text_splitter, TextSplitter) + embedding_model = ( + instantiate_component(config.embedding_model) + if config.embedding_model + else None + ) + assert isinstance(embedding_model, Embeddings) + index_name = hash( + tuple( + [ + self.config.directory.model_dump_json(), + self.config.text_splitter.model_dump_json(), + self.config.embedding_model.model_dump_json(), + ] + ) + ) + self.index_name = f"index_{index_name}" + + self.db = None + self._directory = config.directory + self.llm = llm + self.text_splitter = text_splitter + self.embedding_model = embedding_model + self.db = self.build_index() + self.retriever_config = config.retriever + self.prompt = config.model_prompt + + if not os.path.exists(self.config.experiment_name): + os.makedirs(self.config.experiment_name) + + if not os.path.exists(VECTOR_STORE_PATH): + os.makedirs(VECTOR_STORE_PATH) + + if not os.path.exists(RESULTS_PATH): + os.makedirs(RESULTS_PATH) + + self.config.save(os.path.join(self.config.experiment_name, "config.json")) + + @classmethod + def from_saved_config(cls, config_name_or_path: str): + config = Config.load(config_name_or_path) + return cls(config) + + @property + def prompt(self): + return self._prompt.instruction + + @prompt.setter + def prompt(self, prompt): + self._prompt = RAGPrompt() + self._prompt.instruction = prompt + + def build_index(self): + if os.path.exists(os.path.join(VECTOR_STORE_PATH, self.index_name)): + return FAISS.load_local( + VECTOR_STORE_PATH, + self.embedding_model, + index_name=self.index_name, + allow_dangerous_deserialization=True, + ) + loader = DirectoryLoader(self._directory.name, **self._directory.params) + documents = loader.load() + nodes = self._text_splitter.split_documents(documents) + db = FAISS.from_documents(nodes, self.embedding_model) + db.save_local(VECTOR_STORE_PATH, self.index_name) + return db + + @property + def embedding_model(self): + return self._embedding_model + + @embedding_model.setter + def embedding_model(self, embedding_model): + self._embedding_model = embedding_model + if self.db is not None: + logger.info("Embedding model set. Reinitalizing index.") + self.db = self.build_index() + + @property + def text_splitter(self): + return self._text_splitter + + @text_splitter.setter + def text_splitter(self, text_splitter): + self._text_splitter = text_splitter + if self.db is not None: + logger.info("Text splitter set. Reinitalizing index.") + self.db = self.build_index() + + @property + def retriever_config(self): + return self._retriever_config + + @retriever_config.setter + def retriever_config(self, retriever_config): + assert ( + self.db is not None + ), "Index must be built before retriever can be loaded." + + params = retriever_config.params + name = retriever_config.type + if name == "vanilla": + self._retriever = self.db.as_retriever(**params) + + elif name == "multi_query": + from langchain.retrievers.multi_query import MultiQueryRetriever + + self._retriever = MultiQueryRetriever.from_llm( + retriever=self.db.as_retriever(**params), llm=self.llm + ) + self._retriever_config = retriever_config + + async def invoke(self, dataset: EvaluationDataset): + assert self._retriever is not None, "Retriever must be set before invoking." + llm = LangchainLLMWrapper(self.llm) + + for sample in dataset: + documents = self._retriever.invoke(sample.user_input) + context = [document.page_content for document in documents] + prompt_input = RAGInput(query=sample.user_input, context="\n".join(context)) + response = await self._prompt.generate(data=prompt_input, llm=llm) + sample.retrieved_contexts = context + sample.response = response.response + + return dataset + + async def benchmark( + self, + dataset: EvaluationDataset, + metrics: List[Metric], + eval_llm: BaseLanguageModel, + **kwargs, + ): + dataset = await self.invoke(dataset) + results = evaluate(dataset, metrics=metrics, llm=eval_llm, **kwargs) + results.to_pandas().to_csv( + os.path.join(RESULTS_PATH, self.config.experiment_name, "results.csv") + ) + return results