From 922c5763a5da0d8d7d8639061c1965c55e4e6f9b Mon Sep 17 00:00:00 2001 From: Rahul Tripathi Date: Mon, 15 Apr 2024 16:24:38 +0530 Subject: [PATCH] Add semantic info to metadata, classified by pebblo-server. Signed-off-by: Rahul Tripathi --- .../document_loaders/pebblo.ipynb | 37 ++++- .../document_loaders/pebblo.py | 156 +++++++++++++++--- .../langchain_community/utilities/pebblo.py | 5 + 3 files changed, 174 insertions(+), 24 deletions(-) diff --git a/docs/docs/integrations/document_loaders/pebblo.ipynb b/docs/docs/integrations/document_loaders/pebblo.ipynb index e444c426cd23b..177a11fbab9f2 100644 --- a/docs/docs/integrations/document_loaders/pebblo.ipynb +++ b/docs/docs/integrations/document_loaders/pebblo.ipynb @@ -69,7 +69,7 @@ "source": [ "### Send semantic topics and identities to Pebblo cloud server\n", "\n", - "To send semantic data to pebblo-cloud, pass api-key to PebbloSafeLoader as an argument or alternatively, put the api-ket in `PEBBLO_API_KEY` environment variable." + "To send semantic data to pebblo-cloud, pass api-key to PebbloSafeLoader as an argument or alternatively, put the api-key in `PEBBLO_API_KEY` environment variable." ] }, { @@ -91,6 +91,41 @@ "documents = loader.load()\n", "print(documents)" ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Add semantic topics and identities to loaded metadata\n", + "\n", + "To add semantic topics and sematic entities to metadata of loaded documents, set load_semantic to True as an argument or alternatively, define a new environment variable `PEBBLO_LOAD_SEMANTIC`, and setting it to True." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from langchain.document_loaders.csv_loader import CSVLoader\n", + "from langchain_community.document_loaders import PebbloSafeLoader\n", + "\n", + "loader = PebbloSafeLoader(\n", + " CSVLoader(\"data/corp_sens_data.csv\"),\n", + " name=\"acme-corp-rag-1\", # App name (Mandatory)\n", + " owner=\"Joe Smith\", # Owner (Optional)\n", + " description=\"Support productivity RAG application\", # Description (Optional)\n", + " api_key=\"my-api-key\", # API key (Optional, can be set in the environment variable PEBBLO_API_KEY)\n", + " load_semantic=True, # Load semantic data (Optional, default is False, can be set in the environment variable PEBBLO_LOAD_SEMANTIC)\n", + ")\n", + "documents = loader.load()\n", + "print(documents[0].metadata)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [] } ], "metadata": { diff --git a/libs/community/langchain_community/document_loaders/pebblo.py b/libs/community/langchain_community/document_loaders/pebblo.py index 5cdeffaac4e8e..8b348826293cc 100644 --- a/libs/community/langchain_community/document_loaders/pebblo.py +++ b/libs/community/langchain_community/document_loaders/pebblo.py @@ -5,9 +5,9 @@ import os import uuid from http import HTTPStatus -from typing import Any, Dict, Iterator, List, Optional +from typing import Any, Dict, Iterator, List, Optional, Union -import requests +import requests # type: ignore from langchain_core.documents import Document from langchain_community.document_loaders.base import BaseLoader @@ -19,6 +19,7 @@ PLUGIN_VERSION, App, Doc, + IndexedDocument, get_full_path, get_loader_full_path, get_loader_type, @@ -43,6 +44,7 @@ def __init__( owner: str = "", description: str = "", api_key: Optional[str] = None, + load_semantic: bool = False, ): if not name or not isinstance(name, str): raise NameError("Must specify a valid name.") @@ -50,15 +52,17 @@ def __init__( self.api_key = os.environ.get("PEBBLO_API_KEY") or api_key self.load_id = str(uuid.uuid4()) self.loader = langchain_loader + self.load_semantic = os.environ.get("PEBBLO_LOAD_SEMANTIC") or load_semantic self.owner = owner self.description = description self.source_path = get_loader_full_path(self.loader) self.source_owner = PebbloSafeLoader.get_file_owner_from_path(self.source_path) self.docs: List[Document] = [] + self.docs_with_id: Union[List[IndexedDocument], List[Document], List] = [] loader_name = str(type(self.loader)).split(".")[-1].split("'")[0] self.source_type = get_loader_type(loader_name) self.source_path_size = self.get_source_size(self.source_path) - self.source_aggr_size = 0 + self.source_aggregate_size = 0 self.loader_details = { "loader": loader_name, "source_path": self.source_path, @@ -80,7 +84,15 @@ def load(self) -> List[Document]: list: Documents fetched from load method of the wrapped `loader`. """ self.docs = self.loader.load() - self._send_loader_doc(loading_end=True) + if not self.load_semantic: + self._classify_doc(self.docs, loading_end=True) + return self.docs + self.docs_with_id = self._index_docs() + classified_docs = self._classify_doc(self.docs_with_id, loading_end=True) + self.docs_with_id = self._add_semantic_to_docs( + self.docs_with_id, classified_docs + ) + self.docs = self._unindex_docs(self.docs_with_id) # type: ignore return self.docs def lazy_load(self) -> Iterator[Document]: @@ -104,13 +116,19 @@ def lazy_load(self) -> Iterator[Document]: doc = next(doc_iterator) except StopIteration: self.docs = [] - self._send_loader_doc(loading_end=True) break - self.docs = [ - doc, - ] - self._send_loader_doc() - yield doc + self.docs = list((doc,)) + if not self.load_semantic: + self._classify_doc(self.docs, loading_end=True) + yield self.docs[0] + else: + self.docs_with_id = self._index_docs() + classified_doc = self._classify_doc(self.docs) + self.docs_with_id = self._add_semantic_to_docs( + self.docs_with_id, classified_doc + ) + self.docs = self._unindex_docs(self.docs_with_id) # type: ignore + yield self.docs[0] @classmethod def set_discover_sent(cls) -> None: @@ -120,16 +138,23 @@ def set_discover_sent(cls) -> None: def set_loader_sent(cls) -> None: cls._loader_sent = True - def _send_loader_doc(self, loading_end: bool = False) -> list: + def _classify_doc(self, loaded_docs: list, loading_end: bool = False) -> list: """Send documents fetched from loader to pebblo-server. Then send classified documents to Daxa cloud(If api_key is present). Internal method. Args: + + loaded_docs (list): List of documents fetched from loader's load operation. loading_end (bool, optional): Flag indicating the halt of data - loading by loader. Defaults to False. + loading by loader. Defaults to False. """ - headers = {"Accept": "application/json", "Content-Type": "application/json"} - doc_content = [doc.dict() for doc in self.docs] + headers = { + "Accept": "application/json", + "Content-Type": "application/json", + } + if loading_end is True: + PebbloSafeLoader.set_loader_sent() + doc_content = [doc.dict() for doc in loaded_docs] docs = [] for doc in doc_content: doc_authorized_identities = doc.get("metadata", {}).get( @@ -144,11 +169,13 @@ def _send_loader_doc(self, loading_end: bool = False) -> list: doc_source_size = self.get_source_size(doc_source_path) page_content = str(doc.get("page_content")) page_content_size = self.calculate_content_size(page_content) - self.source_aggr_size += page_content_size + self.source_aggregate_size += page_content_size + doc_id = doc.get("id", None) or 0 docs.append( { "doc": page_content, "source_path": doc_source_path, + "id": doc_id, "last_modified": doc.get("metadata", {}).get("last_modified"), "file_owner": doc_source_owner, **( @@ -176,7 +203,9 @@ def _send_loader_doc(self, loading_end: bool = False) -> list: if loading_end is True: payload["loading_end"] = "true" if "loader_details" in payload: - payload["loader_details"]["source_aggr_size"] = self.source_aggr_size + payload["loader_details"]["source_aggregate_size"] = ( # noqa + self.source_aggregate_size + ) payload = Doc(**payload).dict(exclude_unset=True) load_doc_url = f"{CLASSIFIER_URL}{LOADER_DOC_URL}" classified_docs = [] @@ -202,11 +231,9 @@ def _send_loader_doc(self, loading_end: bool = False) -> list: except requests.exceptions.RequestException: logger.warning("Unable to reach pebblo server.") except Exception as e: - logger.warning("An Exception caught in _send_loader_doc: %s", e) - + logger.warning("An Exception caught in _send_loader_doc: local %s", e) if self.api_key: if not classified_docs: - logger.warning("No classified docs to send to pebblo-cloud.") return classified_docs try: payload["docs"] = classified_docs @@ -234,7 +261,7 @@ def _send_loader_doc(self, loading_end: bool = False) -> list: except requests.exceptions.RequestException: logger.warning("Unable to reach Pebblo cloud server.") except Exception as e: - logger.warning("An Exception caught in _send_loader_doc: %s", e) + logger.warning("An Exception caught in _send_loader_doc: cloud %s", e) if loading_end is True: PebbloSafeLoader.set_loader_sent() @@ -270,6 +297,12 @@ def _send_discover(self) -> None: pebblo_resp = requests.post( app_discover_url, headers=headers, json=payload, timeout=20 ) + if self.api_key: + pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}/v1/discover" + headers.update({"x-api-key": self.api_key}) + _ = requests.post( + pebblo_cloud_url, headers=headers, json=payload, timeout=20 + ) logger.debug( "send_discover[local]: request url %s, body %s len %s\ response status %s body %s", @@ -287,8 +320,8 @@ def _send_discover(self) -> None: ) except requests.exceptions.RequestException: logger.warning("Unable to reach pebblo server.") - except Exception: - logger.warning("An Exception caught in _send_discover.") + except Exception as e: + logger.warning("An Exception caught in _send_discover: local %s", e) if self.api_key: try: @@ -316,7 +349,7 @@ def _send_discover(self) -> None: except requests.exceptions.RequestException: logger.warning("Unable to reach Pebblo cloud server.") except Exception as e: - logger.warning("An Exception caught in _send_discover: %s", e) + logger.warning("An Exception caught in _send_discover: cloud %s", e) def _get_app_details(self) -> App: """Fetch app details. Internal method. @@ -378,3 +411,80 @@ def get_source_size(self, source_path: str) -> int: total_size += os.path.getsize(fp) size = total_size return size + + def _index_docs(self) -> List[IndexedDocument]: + """ + Indexes the documents and returns a list of IndexedDocument objects. + + Returns: + List[IndexedDocument]: A list of IndexedDocument objects with unique IDs. + """ + docs_with_id = [ + IndexedDocument(id=hex(i)[2:], **doc.dict()) + for i, doc in enumerate(self.docs) + ] + return docs_with_id + + def _add_semantic_to_docs( + self, docs_with_id: List[IndexedDocument], classified_docs: List[dict] + ) -> List[Document]: + """ + Adds semantic metadata to the given list of documents. + + Args: + docs_with_id (List[IndexedDocument]): A list of IndexedDocument objects + containing the documents with their IDs. + classified_docs (List[dict]): A list of dictionaries containing the + classified documents. + + Returns: + List[Document]: A list of Document objects with added semantic metadata. + """ + indexed_docs = { + doc.id: Document(page_content=doc.page_content, metadata=doc.metadata) + for doc in docs_with_id + } + + for classified_doc in classified_docs: + doc_id = classified_doc.get("id") + if doc_id in indexed_docs: + self._add_semantic_to_doc(indexed_docs[doc_id], classified_doc) + + semantic_metadata_docs = [doc for doc in indexed_docs.values()] + + return semantic_metadata_docs + + def _unindex_docs(self, docs_with_id: List[IndexedDocument]) -> List[Document]: + """ + Converts a list of IndexedDocument objects to a list of Document objects. + + Args: + docs_with_id (List[IndexedDocument]): A list of IndexedDocument objects. + + Returns: + List[Document]: A list of Document objects. + """ + docs = [ + Document(page_content=doc.page_content, metadata=doc.metadata) + for i, doc in enumerate(docs_with_id) + ] + return docs + + def _add_semantic_to_doc(self, doc: Document, classified_doc: dict) -> Document: + """ + Adds semantic metadata to the given document in-place. + + Args: + doc (Document): A Document object. + classified_doc (dict): A dictionary containing the classified document. + + Returns: + Document: The Document object with added semantic metadata. + """ + doc.metadata["pebblo_semantic_entities"] = list( + classified_doc.get("entities", {}).keys() + ) + doc.metadata["pebblo_semantic_topics"] = list( + classified_doc.get("topics", {}).keys() + ) + return doc diff --git a/libs/community/langchain_community/utilities/pebblo.py b/libs/community/langchain_community/utilities/pebblo.py index da65a5835dde8..df799c7fe00a3 100644 --- a/libs/community/langchain_community/utilities/pebblo.py +++ b/libs/community/langchain_community/utilities/pebblo.py @@ -6,6 +6,7 @@ import platform from typing import Optional, Tuple +from langchain_core.documents import Document from langchain_core.env import get_runtime_environment from langchain_core.pydantic_v1 import BaseModel @@ -61,6 +62,10 @@ logger = logging.getLogger(__name__) +class IndexedDocument(Document): + id: str + + class Runtime(BaseModel): """Pebblo Runtime.