From 0cd3f9336164b0971625f19064d07fb08577bf40 Mon Sep 17 00:00:00 2001 From: Rahul Triptahi Date: Sat, 22 Jun 2024 05:33:38 +0530 Subject: [PATCH] Enhance metadata of sharepointLoader. (#22248) Description: 2 feature flags added to SharePointLoader in this PR: 1. load_auth: if set to True, adds authorised identities to metadata 2. load_extended_metadata, adds source, owner and full_path to metadata Unit tests:N/A Documentation: To be done. --------- Signed-off-by: Rahul Tripathi Co-authored-by: Rahul Tripathi --- .../document_loaders/base_o365.py | 21 ++- .../document_loaders/sharepoint.py | 146 ++++++++++++++---- 2 files changed, 134 insertions(+), 33 deletions(-) diff --git a/libs/community/langchain_community/document_loaders/base_o365.py b/libs/community/langchain_community/document_loaders/base_o365.py index 412076651d12c..3dab33b6166fa 100644 --- a/libs/community/langchain_community/document_loaders/base_o365.py +++ b/libs/community/langchain_community/document_loaders/base_o365.py @@ -124,6 +124,7 @@ def _load_from_folder(self, folder: Folder) -> Iterable[Blob]: "created_by": str(file.created_by), "modified_by": str(file.modified_by), "description": file.description, + "id": str(file.object_id), } loader = FileSystemBlobLoader(path=temp_dir) @@ -157,6 +158,7 @@ def _load_from_object_ids( the files loaded from the drive using the specified object_ids. """ file_mime_types = self._fetch_mime_types + metadata_dict: Dict[str, Dict[str, Any]] = {} with tempfile.TemporaryDirectory() as temp_dir: for object_id in object_ids: file = drive.get_item(object_id) @@ -169,8 +171,25 @@ def _load_from_object_ids( if file.is_file: if file.mime_type in list(file_mime_types.values()): file.download(to_path=temp_dir, chunk_size=self.chunk_size) + metadata_dict[file.name] = { + "source": file.web_url, + "mime_type": file.mime_type, + "created": file.created, + "modified": file.modified, + "created_by": str(file.created_by), + "modified_by": str(file.modified_by), + "description": file.description, + "id": str(file.object_id), + } + loader = FileSystemBlobLoader(path=temp_dir) - yield from loader.yield_blobs() + for blob in loader.yield_blobs(): + if not isinstance(blob.path, PurePath): + raise NotImplementedError("Expected blob path to be a PurePath") + if blob.path: + file_metadata_ = metadata_dict.get(str(blob.path.name), {}) + blob.metadata.update(file_metadata_) + yield blob def _auth(self) -> Account: """Authenticates the OneDrive API client diff --git a/libs/community/langchain_community/document_loaders/sharepoint.py b/libs/community/langchain_community/document_loaders/sharepoint.py index 03674f92f6888..cc75315a63784 100644 --- a/libs/community/langchain_community/document_loaders/sharepoint.py +++ b/libs/community/langchain_community/document_loaders/sharepoint.py @@ -6,7 +6,7 @@ from pathlib import Path from typing import Any, Iterator, List, Optional, Sequence -import requests +import requests # type: ignore from langchain_core.document_loaders import BaseLoader from langchain_core.documents import Document from langchain_core.pydantic_v1 import Field @@ -37,19 +37,31 @@ class SharePointLoader(O365BaseLoader, BaseLoader): """ The ID of the file for which we need auth identities""" site_id: Optional[str] = None """ The ID of the Sharepoint site of the user where the file is present """ + load_extended_metadata: Optional[bool] = False + """ Whether to load extended metadata. Size, Owner and full_path.""" @property def _file_types(self) -> Sequence[_FileType]: - """Return supported file types.""" + """Return supported file types. + Returns: + A sequence of supported file types. + """ return _FileType.DOC, _FileType.DOCX, _FileType.PDF @property def _scopes(self) -> List[str]: - """Return required scopes.""" + """Return required scopes. + Returns: + List[str]: A list of required scopes. + """ return ["sharepoint", "basic"] def lazy_load(self) -> Iterator[Document]: - """Load documents lazily. Use this when working at a large scale.""" + """ + Load documents lazily. Use this when working at a large scale. + Yields: + Document: A document object representing the parsed blob. + """ try: from O365.drive import Drive, Folder except ImportError: @@ -65,22 +77,47 @@ def lazy_load(self) -> Iterator[Document]: if not isinstance(target_folder, Folder): raise ValueError(f"There isn't a folder with path {self.folder_path}.") for blob in self._load_from_folder(target_folder): + file_id = str(blob.metadata.get("id")) if self.load_auth is True: - for parsed_blob in blob_parser.lazy_parse(blob): - auth_identities = self.authorized_identities() + auth_identities = self.authorized_identities(file_id) + if self.load_extended_metadata is True: + extended_metadata = self.get_extended_metadata(file_id) + for parsed_blob in blob_parser.lazy_parse(blob): + if self.load_auth is True: parsed_blob.metadata["authorized_identities"] = auth_identities - yield parsed_blob - else: - yield from blob_parser.lazy_parse(blob) + if self.load_extended_metadata is True: + parsed_blob.metadata.update(extended_metadata) + yield parsed_blob if self.folder_id: target_folder = drive.get_item(self.folder_id) if not isinstance(target_folder, Folder): raise ValueError(f"There isn't a folder with path {self.folder_path}.") for blob in self._load_from_folder(target_folder): - yield from blob_parser.lazy_parse(blob) + file_id = str(blob.metadata.get("id")) + if self.load_auth is True: + auth_identities = self.authorized_identities(file_id) + if self.load_extended_metadata is True: + extended_metadata = self.get_extended_metadata(file_id) + for parsed_blob in blob_parser.lazy_parse(blob): + if self.load_auth is True: + parsed_blob.metadata["authorized_identities"] = auth_identities + if self.load_extended_metadata is True: + parsed_blob.metadata.update(extended_metadata) + yield parsed_blob if self.object_ids: for blob in self._load_from_object_ids(drive, self.object_ids): - yield from blob_parser.lazy_parse(blob) + file_id = str(blob.metadata.get("id")) + if self.load_auth is True: + auth_identities = self.authorized_identities(file_id) + if self.load_extended_metadata is True: + extended_metadata = self.get_extended_metadata(file_id) + for parsed_blob in blob_parser.lazy_parse(blob): + if self.load_auth is True: + parsed_blob.metadata["authorized_identities"] = auth_identities + if self.load_extended_metadata is True: + parsed_blob.metadata.update(extended_metadata) + yield parsed_blob + if not (self.folder_path or self.folder_id or self.object_ids): target_folder = drive.get_root_folder() if not isinstance(target_folder, Folder): @@ -90,38 +127,83 @@ def lazy_load(self) -> Iterator[Document]: blob_part.metadata.update(blob.metadata) yield blob_part - def authorized_identities(self) -> List: + def authorized_identities(self, file_id: str) -> List: + """ + Retrieve the access identities (user/group emails) for a given file. + Args: + file_id (str): The ID of the file. + Returns: + List: A list of group names (email addresses) that have + access to the file. + """ data = self._fetch_access_token() access_token = data.get("access_token") url = ( - f"https://graph.microsoft.com/v1.0/sites/{self.site_id}/" - f"drives/{self.document_library_id}/items/{self.file_id}/permissions" + "https://graph.microsoft.com/v1.0/drives" + f"/{self.document_library_id}/items/{file_id}/permissions" ) headers = {"Authorization": f"Bearer {access_token}"} - response = requests.request("GET", url, headers=headers, data={}) - groups_list = response.json() + response = requests.request("GET", url, headers=headers) + access_list = response.json() group_names = [] - for group_data in groups_list.get("value"): - if group_data.get("grantedToV2"): - if group_data.get("grantedToV2").get("siteGroup"): - site_data = group_data.get("grantedToV2").get("siteGroup") - # print(group_data) - group_names.append(site_data.get("displayName")) - elif group_data.get("grantedToV2").get("group") or ( - group_data.get("grantedToV2").get("user") - ): - site_data = group_data.get("grantedToV2").get("group") or ( - group_data.get("grantedToV2").get("user") - ) - # print(group_data) - group_names.append(site_data.get("displayName")) - + for access_data in access_list.get("value"): + if access_data.get("grantedToV2"): + site_data = ( + (access_data.get("grantedToV2").get("siteUser")) + or (access_data.get("grantedToV2").get("user")) + or (access_data.get("grantedToV2").get("group")) + ) + if site_data: + email = site_data.get("email") + if email: + group_names.append(email) return group_names def _fetch_access_token(self) -> Any: - with open(self.token_path) as f: + """ + Fetch the access token from the token file. + Returns: + The access token as a dictionary. + """ + with open(self.token_path, encoding="utf-8") as f: s = f.read() data = json.loads(s) return data + + def get_extended_metadata(self, file_id: str) -> dict: + """ + Retrieve extended metadata for a file in SharePoint. + As of today, following fields are supported in the extended metadata: + - size: size of the source file. + - owner: display name of the owner of the source file. + - full_path: pretty human readable path of the source file. + Args: + file_id (str): The ID of the file. + Returns: + dict: A dictionary containing the extended metadata of the file, + including size, owner, and full path. + """ + data = self._fetch_access_token() + access_token = data.get("access_token") + url = ( + "https://graph.microsoft.com/v1.0/drives/" + f"{self.document_library_id}/items/{file_id}" + "?$select=size,createdBy,parentReference,name" + ) + headers = {"Authorization": f"Bearer {access_token}"} + response = requests.request("GET", url, headers=headers) + metadata = response.json() + staged_metadata = { + "size": metadata.get("size", 0), + "owner": metadata.get("createdBy", {}) + .get("user", {}) + .get("displayName", ""), + "full_path": metadata.get("parentReference", {}) + .get("path", "") + .split(":")[-1] + + "/" + + metadata.get("name", ""), + } + return staged_metadata