Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add semantic info to metadata, classified by pebblo-server. #20468

Merged
merged 1 commit into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion docs/docs/integrations/document_loaders/pebblo.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
"source": [
"### Send semantic topics and identities to Pebblo cloud server\n",
"\n",
"To send semantic data to pebblo-cloud, pass api-key to PebbloSafeLoader as an argument or alternatively, put the api-ket in `PEBBLO_API_KEY` environment variable."
"To send semantic data to pebblo-cloud, pass api-key to PebbloSafeLoader as an argument or alternatively, put the api-key in `PEBBLO_API_KEY` environment variable."
]
},
{
Expand All @@ -91,6 +91,41 @@
"documents = loader.load()\n",
"print(documents)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Add semantic topics and identities to loaded metadata\n",
"\n",
"To add semantic topics and sematic entities to metadata of loaded documents, set load_semantic to True as an argument or alternatively, define a new environment variable `PEBBLO_LOAD_SEMANTIC`, and setting it to True."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from langchain.document_loaders.csv_loader import CSVLoader\n",
"from langchain_community.document_loaders import PebbloSafeLoader\n",
"\n",
"loader = PebbloSafeLoader(\n",
" CSVLoader(\"data/corp_sens_data.csv\"),\n",
" name=\"acme-corp-rag-1\", # App name (Mandatory)\n",
" owner=\"Joe Smith\", # Owner (Optional)\n",
" description=\"Support productivity RAG application\", # Description (Optional)\n",
" api_key=\"my-api-key\", # API key (Optional, can be set in the environment variable PEBBLO_API_KEY)\n",
" load_semantic=True, # Load semantic data (Optional, default is False, can be set in the environment variable PEBBLO_LOAD_SEMANTIC)\n",
")\n",
"documents = loader.load()\n",
"print(documents[0].metadata)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": []
}
],
"metadata": {
Expand Down
156 changes: 133 additions & 23 deletions libs/community/langchain_community/document_loaders/pebblo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import os
import uuid
from http import HTTPStatus
from typing import Any, Dict, Iterator, List, Optional
from typing import Any, Dict, Iterator, List, Optional, Union

import requests
import requests # type: ignore
from langchain_core.documents import Document

from langchain_community.document_loaders.base import BaseLoader
Expand All @@ -19,6 +19,7 @@
PLUGIN_VERSION,
App,
Doc,
IndexedDocument,
get_full_path,
get_loader_full_path,
get_loader_type,
Expand All @@ -43,22 +44,25 @@ def __init__(
owner: str = "",
description: str = "",
api_key: Optional[str] = None,
load_semantic: bool = False,
):
if not name or not isinstance(name, str):
raise NameError("Must specify a valid name.")
self.app_name = name
self.api_key = os.environ.get("PEBBLO_API_KEY") or api_key
self.load_id = str(uuid.uuid4())
self.loader = langchain_loader
self.load_semantic = os.environ.get("PEBBLO_LOAD_SEMANTIC") or load_semantic
self.owner = owner
self.description = description
self.source_path = get_loader_full_path(self.loader)
self.source_owner = PebbloSafeLoader.get_file_owner_from_path(self.source_path)
self.docs: List[Document] = []
self.docs_with_id: Union[List[IndexedDocument], List[Document], List] = []
loader_name = str(type(self.loader)).split(".")[-1].split("'")[0]
self.source_type = get_loader_type(loader_name)
self.source_path_size = self.get_source_size(self.source_path)
self.source_aggr_size = 0
self.source_aggregate_size = 0
self.loader_details = {
"loader": loader_name,
"source_path": self.source_path,
Expand All @@ -80,7 +84,15 @@ def load(self) -> List[Document]:
list: Documents fetched from load method of the wrapped `loader`.
"""
self.docs = self.loader.load()
self._send_loader_doc(loading_end=True)
if not self.load_semantic:
self._classify_doc(self.docs, loading_end=True)
return self.docs
self.docs_with_id = self._index_docs()
classified_docs = self._classify_doc(self.docs_with_id, loading_end=True)
self.docs_with_id = self._add_semantic_to_docs(
self.docs_with_id, classified_docs
)
self.docs = self._unindex_docs(self.docs_with_id) # type: ignore
return self.docs

def lazy_load(self) -> Iterator[Document]:
Expand All @@ -104,13 +116,19 @@ def lazy_load(self) -> Iterator[Document]:
doc = next(doc_iterator)
except StopIteration:
self.docs = []
self._send_loader_doc(loading_end=True)
break
self.docs = [
doc,
]
self._send_loader_doc()
yield doc
self.docs = list((doc,))
if not self.load_semantic:
self._classify_doc(self.docs, loading_end=True)
yield self.docs[0]
else:
self.docs_with_id = self._index_docs()
classified_doc = self._classify_doc(self.docs)
self.docs_with_id = self._add_semantic_to_docs(
self.docs_with_id, classified_doc
)
self.docs = self._unindex_docs(self.docs_with_id) # type: ignore
yield self.docs[0]

@classmethod
def set_discover_sent(cls) -> None:
Expand All @@ -120,16 +138,23 @@ def set_discover_sent(cls) -> None:
def set_loader_sent(cls) -> None:
cls._loader_sent = True

def _send_loader_doc(self, loading_end: bool = False) -> list:
def _classify_doc(self, loaded_docs: list, loading_end: bool = False) -> list:
"""Send documents fetched from loader to pebblo-server. Then send
classified documents to Daxa cloud(If api_key is present). Internal method.

Args:

loaded_docs (list): List of documents fetched from loader's load operation.
loading_end (bool, optional): Flag indicating the halt of data
loading by loader. Defaults to False.
loading by loader. Defaults to False.
"""
headers = {"Accept": "application/json", "Content-Type": "application/json"}
doc_content = [doc.dict() for doc in self.docs]
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
}
if loading_end is True:
PebbloSafeLoader.set_loader_sent()
doc_content = [doc.dict() for doc in loaded_docs]
docs = []
for doc in doc_content:
doc_authorized_identities = doc.get("metadata", {}).get(
Expand All @@ -144,11 +169,13 @@ def _send_loader_doc(self, loading_end: bool = False) -> list:
doc_source_size = self.get_source_size(doc_source_path)
page_content = str(doc.get("page_content"))
page_content_size = self.calculate_content_size(page_content)
self.source_aggr_size += page_content_size
self.source_aggregate_size += page_content_size
doc_id = doc.get("id", None) or 0
docs.append(
{
"doc": page_content,
"source_path": doc_source_path,
"id": doc_id,
"last_modified": doc.get("metadata", {}).get("last_modified"),
"file_owner": doc_source_owner,
**(
Expand Down Expand Up @@ -176,7 +203,9 @@ def _send_loader_doc(self, loading_end: bool = False) -> list:
if loading_end is True:
payload["loading_end"] = "true"
if "loader_details" in payload:
payload["loader_details"]["source_aggr_size"] = self.source_aggr_size
payload["loader_details"]["source_aggregate_size"] = ( # noqa
self.source_aggregate_size
)
payload = Doc(**payload).dict(exclude_unset=True)
load_doc_url = f"{CLASSIFIER_URL}{LOADER_DOC_URL}"
classified_docs = []
Expand All @@ -202,11 +231,9 @@ def _send_loader_doc(self, loading_end: bool = False) -> list:
except requests.exceptions.RequestException:
logger.warning("Unable to reach pebblo server.")
except Exception as e:
logger.warning("An Exception caught in _send_loader_doc: %s", e)

logger.warning("An Exception caught in _send_loader_doc: local %s", e)
if self.api_key:
if not classified_docs:
logger.warning("No classified docs to send to pebblo-cloud.")
return classified_docs
try:
payload["docs"] = classified_docs
Expand Down Expand Up @@ -234,7 +261,7 @@ def _send_loader_doc(self, loading_end: bool = False) -> list:
except requests.exceptions.RequestException:
logger.warning("Unable to reach Pebblo cloud server.")
except Exception as e:
logger.warning("An Exception caught in _send_loader_doc: %s", e)
logger.warning("An Exception caught in _send_loader_doc: cloud %s", e)

if loading_end is True:
PebbloSafeLoader.set_loader_sent()
Expand Down Expand Up @@ -270,6 +297,12 @@ def _send_discover(self) -> None:
pebblo_resp = requests.post(
app_discover_url, headers=headers, json=payload, timeout=20
)
if self.api_key:
pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}/v1/discover"
headers.update({"x-api-key": self.api_key})
_ = requests.post(
pebblo_cloud_url, headers=headers, json=payload, timeout=20
)
logger.debug(
"send_discover[local]: request url %s, body %s len %s\
response status %s body %s",
Expand All @@ -287,8 +320,8 @@ def _send_discover(self) -> None:
)
except requests.exceptions.RequestException:
logger.warning("Unable to reach pebblo server.")
except Exception:
logger.warning("An Exception caught in _send_discover.")
except Exception as e:
logger.warning("An Exception caught in _send_discover: local %s", e)

if self.api_key:
try:
Expand Down Expand Up @@ -316,7 +349,7 @@ def _send_discover(self) -> None:
except requests.exceptions.RequestException:
logger.warning("Unable to reach Pebblo cloud server.")
except Exception as e:
logger.warning("An Exception caught in _send_discover: %s", e)
logger.warning("An Exception caught in _send_discover: cloud %s", e)

def _get_app_details(self) -> App:
"""Fetch app details. Internal method.
Expand Down Expand Up @@ -378,3 +411,80 @@ def get_source_size(self, source_path: str) -> int:
total_size += os.path.getsize(fp)
size = total_size
return size

def _index_docs(self) -> List[IndexedDocument]:
"""
Indexes the documents and returns a list of IndexedDocument objects.

Returns:
List[IndexedDocument]: A list of IndexedDocument objects with unique IDs.
"""
docs_with_id = [
IndexedDocument(id=hex(i)[2:], **doc.dict())
for i, doc in enumerate(self.docs)
]
return docs_with_id

def _add_semantic_to_docs(
self, docs_with_id: List[IndexedDocument], classified_docs: List[dict]
) -> List[Document]:
"""
Adds semantic metadata to the given list of documents.

Args:
docs_with_id (List[IndexedDocument]): A list of IndexedDocument objects
containing the documents with their IDs.
classified_docs (List[dict]): A list of dictionaries containing the
classified documents.

Returns:
List[Document]: A list of Document objects with added semantic metadata.
"""
indexed_docs = {
doc.id: Document(page_content=doc.page_content, metadata=doc.metadata)
for doc in docs_with_id
}

for classified_doc in classified_docs:
doc_id = classified_doc.get("id")
if doc_id in indexed_docs:
self._add_semantic_to_doc(indexed_docs[doc_id], classified_doc)

semantic_metadata_docs = [doc for doc in indexed_docs.values()]

return semantic_metadata_docs

def _unindex_docs(self, docs_with_id: List[IndexedDocument]) -> List[Document]:
"""
Converts a list of IndexedDocument objects to a list of Document objects.

Args:
docs_with_id (List[IndexedDocument]): A list of IndexedDocument objects.

Returns:
List[Document]: A list of Document objects.
"""
docs = [
Document(page_content=doc.page_content, metadata=doc.metadata)
for i, doc in enumerate(docs_with_id)
]
return docs

def _add_semantic_to_doc(self, doc: Document, classified_doc: dict) -> Document:
"""
Adds semantic metadata to the given document in-place.

Args:
doc (Document): A Document object.
classified_doc (dict): A dictionary containing the classified document.

Returns:
Document: The Document object with added semantic metadata.
"""
doc.metadata["pebblo_semantic_entities"] = list(
classified_doc.get("entities", {}).keys()
)
doc.metadata["pebblo_semantic_topics"] = list(
classified_doc.get("topics", {}).keys()
)
return doc
5 changes: 5 additions & 0 deletions libs/community/langchain_community/utilities/pebblo.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import platform
from typing import Optional, Tuple

from langchain_core.documents import Document
from langchain_core.env import get_runtime_environment
from langchain_core.pydantic_v1 import BaseModel

Expand Down Expand Up @@ -61,6 +62,10 @@
logger = logging.getLogger(__name__)


class IndexedDocument(Document):
id: str


class Runtime(BaseModel):
"""Pebblo Runtime.

Expand Down
Loading