Skip to content

Commit

Permalink
Add semantic info to metadata, classified by pebblo-server.
Browse files Browse the repository at this point in the history
Signed-off-by: Rahul Tripathi <[email protected]>
  • Loading branch information
Rahul Tripathi committed Apr 25, 2024
1 parent 6ccecf2 commit 922c576
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 24 deletions.
37 changes: 36 additions & 1 deletion docs/docs/integrations/document_loaders/pebblo.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
"source": [
"### Send semantic topics and identities to Pebblo cloud server\n",
"\n",
"To send semantic data to pebblo-cloud, pass api-key to PebbloSafeLoader as an argument or alternatively, put the api-ket in `PEBBLO_API_KEY` environment variable."
"To send semantic data to pebblo-cloud, pass api-key to PebbloSafeLoader as an argument or alternatively, put the api-key in `PEBBLO_API_KEY` environment variable."
]
},
{
Expand All @@ -91,6 +91,41 @@
"documents = loader.load()\n",
"print(documents)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Add semantic topics and identities to loaded metadata\n",
"\n",
"To add semantic topics and sematic entities to metadata of loaded documents, set load_semantic to True as an argument or alternatively, define a new environment variable `PEBBLO_LOAD_SEMANTIC`, and setting it to True."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from langchain.document_loaders.csv_loader import CSVLoader\n",
"from langchain_community.document_loaders import PebbloSafeLoader\n",
"\n",
"loader = PebbloSafeLoader(\n",
" CSVLoader(\"data/corp_sens_data.csv\"),\n",
" name=\"acme-corp-rag-1\", # App name (Mandatory)\n",
" owner=\"Joe Smith\", # Owner (Optional)\n",
" description=\"Support productivity RAG application\", # Description (Optional)\n",
" api_key=\"my-api-key\", # API key (Optional, can be set in the environment variable PEBBLO_API_KEY)\n",
" load_semantic=True, # Load semantic data (Optional, default is False, can be set in the environment variable PEBBLO_LOAD_SEMANTIC)\n",
")\n",
"documents = loader.load()\n",
"print(documents[0].metadata)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": []
}
],
"metadata": {
Expand Down
156 changes: 133 additions & 23 deletions libs/community/langchain_community/document_loaders/pebblo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import os
import uuid
from http import HTTPStatus
from typing import Any, Dict, Iterator, List, Optional
from typing import Any, Dict, Iterator, List, Optional, Union

import requests
import requests # type: ignore
from langchain_core.documents import Document

from langchain_community.document_loaders.base import BaseLoader
Expand All @@ -19,6 +19,7 @@
PLUGIN_VERSION,
App,
Doc,
IndexedDocument,
get_full_path,
get_loader_full_path,
get_loader_type,
Expand All @@ -43,22 +44,25 @@ def __init__(
owner: str = "",
description: str = "",
api_key: Optional[str] = None,
load_semantic: bool = False,
):
if not name or not isinstance(name, str):
raise NameError("Must specify a valid name.")
self.app_name = name
self.api_key = os.environ.get("PEBBLO_API_KEY") or api_key
self.load_id = str(uuid.uuid4())
self.loader = langchain_loader
self.load_semantic = os.environ.get("PEBBLO_LOAD_SEMANTIC") or load_semantic
self.owner = owner
self.description = description
self.source_path = get_loader_full_path(self.loader)
self.source_owner = PebbloSafeLoader.get_file_owner_from_path(self.source_path)
self.docs: List[Document] = []
self.docs_with_id: Union[List[IndexedDocument], List[Document], List] = []
loader_name = str(type(self.loader)).split(".")[-1].split("'")[0]
self.source_type = get_loader_type(loader_name)
self.source_path_size = self.get_source_size(self.source_path)
self.source_aggr_size = 0
self.source_aggregate_size = 0
self.loader_details = {
"loader": loader_name,
"source_path": self.source_path,
Expand All @@ -80,7 +84,15 @@ def load(self) -> List[Document]:
list: Documents fetched from load method of the wrapped `loader`.
"""
self.docs = self.loader.load()
self._send_loader_doc(loading_end=True)
if not self.load_semantic:
self._classify_doc(self.docs, loading_end=True)
return self.docs
self.docs_with_id = self._index_docs()
classified_docs = self._classify_doc(self.docs_with_id, loading_end=True)
self.docs_with_id = self._add_semantic_to_docs(
self.docs_with_id, classified_docs
)
self.docs = self._unindex_docs(self.docs_with_id) # type: ignore
return self.docs

def lazy_load(self) -> Iterator[Document]:
Expand All @@ -104,13 +116,19 @@ def lazy_load(self) -> Iterator[Document]:
doc = next(doc_iterator)
except StopIteration:
self.docs = []
self._send_loader_doc(loading_end=True)
break
self.docs = [
doc,
]
self._send_loader_doc()
yield doc
self.docs = list((doc,))
if not self.load_semantic:
self._classify_doc(self.docs, loading_end=True)
yield self.docs[0]
else:
self.docs_with_id = self._index_docs()
classified_doc = self._classify_doc(self.docs)
self.docs_with_id = self._add_semantic_to_docs(
self.docs_with_id, classified_doc
)
self.docs = self._unindex_docs(self.docs_with_id) # type: ignore
yield self.docs[0]

@classmethod
def set_discover_sent(cls) -> None:
Expand All @@ -120,16 +138,23 @@ def set_discover_sent(cls) -> None:
def set_loader_sent(cls) -> None:
cls._loader_sent = True

def _send_loader_doc(self, loading_end: bool = False) -> list:
def _classify_doc(self, loaded_docs: list, loading_end: bool = False) -> list:
"""Send documents fetched from loader to pebblo-server. Then send
classified documents to Daxa cloud(If api_key is present). Internal method.
Args:
loaded_docs (list): List of documents fetched from loader's load operation.
loading_end (bool, optional): Flag indicating the halt of data
loading by loader. Defaults to False.
loading by loader. Defaults to False.
"""
headers = {"Accept": "application/json", "Content-Type": "application/json"}
doc_content = [doc.dict() for doc in self.docs]
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
}
if loading_end is True:
PebbloSafeLoader.set_loader_sent()
doc_content = [doc.dict() for doc in loaded_docs]
docs = []
for doc in doc_content:
doc_authorized_identities = doc.get("metadata", {}).get(
Expand All @@ -144,11 +169,13 @@ def _send_loader_doc(self, loading_end: bool = False) -> list:
doc_source_size = self.get_source_size(doc_source_path)
page_content = str(doc.get("page_content"))
page_content_size = self.calculate_content_size(page_content)
self.source_aggr_size += page_content_size
self.source_aggregate_size += page_content_size
doc_id = doc.get("id", None) or 0
docs.append(
{
"doc": page_content,
"source_path": doc_source_path,
"id": doc_id,
"last_modified": doc.get("metadata", {}).get("last_modified"),
"file_owner": doc_source_owner,
**(
Expand Down Expand Up @@ -176,7 +203,9 @@ def _send_loader_doc(self, loading_end: bool = False) -> list:
if loading_end is True:
payload["loading_end"] = "true"
if "loader_details" in payload:
payload["loader_details"]["source_aggr_size"] = self.source_aggr_size
payload["loader_details"]["source_aggregate_size"] = ( # noqa
self.source_aggregate_size
)
payload = Doc(**payload).dict(exclude_unset=True)
load_doc_url = f"{CLASSIFIER_URL}{LOADER_DOC_URL}"
classified_docs = []
Expand All @@ -202,11 +231,9 @@ def _send_loader_doc(self, loading_end: bool = False) -> list:
except requests.exceptions.RequestException:
logger.warning("Unable to reach pebblo server.")
except Exception as e:
logger.warning("An Exception caught in _send_loader_doc: %s", e)

logger.warning("An Exception caught in _send_loader_doc: local %s", e)
if self.api_key:
if not classified_docs:
logger.warning("No classified docs to send to pebblo-cloud.")
return classified_docs
try:
payload["docs"] = classified_docs
Expand Down Expand Up @@ -234,7 +261,7 @@ def _send_loader_doc(self, loading_end: bool = False) -> list:
except requests.exceptions.RequestException:
logger.warning("Unable to reach Pebblo cloud server.")
except Exception as e:
logger.warning("An Exception caught in _send_loader_doc: %s", e)
logger.warning("An Exception caught in _send_loader_doc: cloud %s", e)

if loading_end is True:
PebbloSafeLoader.set_loader_sent()
Expand Down Expand Up @@ -270,6 +297,12 @@ def _send_discover(self) -> None:
pebblo_resp = requests.post(
app_discover_url, headers=headers, json=payload, timeout=20
)
if self.api_key:
pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}/v1/discover"
headers.update({"x-api-key": self.api_key})
_ = requests.post(
pebblo_cloud_url, headers=headers, json=payload, timeout=20
)
logger.debug(
"send_discover[local]: request url %s, body %s len %s\
response status %s body %s",
Expand All @@ -287,8 +320,8 @@ def _send_discover(self) -> None:
)
except requests.exceptions.RequestException:
logger.warning("Unable to reach pebblo server.")
except Exception:
logger.warning("An Exception caught in _send_discover.")
except Exception as e:
logger.warning("An Exception caught in _send_discover: local %s", e)

if self.api_key:
try:
Expand Down Expand Up @@ -316,7 +349,7 @@ def _send_discover(self) -> None:
except requests.exceptions.RequestException:
logger.warning("Unable to reach Pebblo cloud server.")
except Exception as e:
logger.warning("An Exception caught in _send_discover: %s", e)
logger.warning("An Exception caught in _send_discover: cloud %s", e)

def _get_app_details(self) -> App:
"""Fetch app details. Internal method.
Expand Down Expand Up @@ -378,3 +411,80 @@ def get_source_size(self, source_path: str) -> int:
total_size += os.path.getsize(fp)
size = total_size
return size

def _index_docs(self) -> List[IndexedDocument]:
"""
Indexes the documents and returns a list of IndexedDocument objects.
Returns:
List[IndexedDocument]: A list of IndexedDocument objects with unique IDs.
"""
docs_with_id = [
IndexedDocument(id=hex(i)[2:], **doc.dict())
for i, doc in enumerate(self.docs)
]
return docs_with_id

def _add_semantic_to_docs(
self, docs_with_id: List[IndexedDocument], classified_docs: List[dict]
) -> List[Document]:
"""
Adds semantic metadata to the given list of documents.
Args:
docs_with_id (List[IndexedDocument]): A list of IndexedDocument objects
containing the documents with their IDs.
classified_docs (List[dict]): A list of dictionaries containing the
classified documents.
Returns:
List[Document]: A list of Document objects with added semantic metadata.
"""
indexed_docs = {
doc.id: Document(page_content=doc.page_content, metadata=doc.metadata)
for doc in docs_with_id
}

for classified_doc in classified_docs:
doc_id = classified_doc.get("id")
if doc_id in indexed_docs:
self._add_semantic_to_doc(indexed_docs[doc_id], classified_doc)

semantic_metadata_docs = [doc for doc in indexed_docs.values()]

return semantic_metadata_docs

def _unindex_docs(self, docs_with_id: List[IndexedDocument]) -> List[Document]:
"""
Converts a list of IndexedDocument objects to a list of Document objects.
Args:
docs_with_id (List[IndexedDocument]): A list of IndexedDocument objects.
Returns:
List[Document]: A list of Document objects.
"""
docs = [
Document(page_content=doc.page_content, metadata=doc.metadata)
for i, doc in enumerate(docs_with_id)
]
return docs

def _add_semantic_to_doc(self, doc: Document, classified_doc: dict) -> Document:
"""
Adds semantic metadata to the given document in-place.
Args:
doc (Document): A Document object.
classified_doc (dict): A dictionary containing the classified document.
Returns:
Document: The Document object with added semantic metadata.
"""
doc.metadata["pebblo_semantic_entities"] = list(
classified_doc.get("entities", {}).keys()
)
doc.metadata["pebblo_semantic_topics"] = list(
classified_doc.get("topics", {}).keys()
)
return doc
5 changes: 5 additions & 0 deletions libs/community/langchain_community/utilities/pebblo.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import platform
from typing import Optional, Tuple

from langchain_core.documents import Document
from langchain_core.env import get_runtime_environment
from langchain_core.pydantic_v1 import BaseModel

Expand Down Expand Up @@ -61,6 +62,10 @@
logger = logging.getLogger(__name__)


class IndexedDocument(Document):
id: str


class Runtime(BaseModel):
"""Pebblo Runtime.
Expand Down

0 comments on commit 922c576

Please sign in to comment.