From 4708a30f5cdf14175c2737b38f66222c5ebf863a Mon Sep 17 00:00:00 2001 From: Luciano Silveira Date: Tue, 22 Oct 2024 16:23:49 -0300 Subject: [PATCH] Feature/file system loader (#41) * use Defaults for saia.metadata * add file system minimal options --- README.md | 1 + fs/filesystem_config.md | 29 +++ fs/simple_folder_reader.py | 305 +++++++++++++++++++++++ saia_ingest/command_line/command_line.py | 4 +- saia_ingest/config.py | 1 + saia_ingest/ingestor.py | 88 ++++++- saia_ingest/profile_utils.py | 6 +- saia_ingest/utils.py | 3 +- 8 files changed, 430 insertions(+), 7 deletions(-) create mode 100644 fs/filesystem_config.md create mode 100644 fs/simple_folder_reader.py diff --git a/README.md b/README.md index 1106a9c..1177040 100644 --- a/README.md +++ b/README.md @@ -81,6 +81,7 @@ saia-cli ingest -c ./config/s3_sandbox.yaml --type test The configuration file details all parameters needed to run the ingestion, use the `--type` to decide the target ingestion; supported data sources are: + * `fs` (file system) [config](./fs/filesystem_config.md) * `s3` [config](./amazon_s3/s3_config.md) * `jira` [config](./atlassian_jira/jira_config.md) * `confluence` [config](./atlassian_confluence/confluence_config.md) diff --git a/fs/filesystem_config.md b/fs/filesystem_config.md new file mode 100644 index 0000000..c11cc6a --- /dev/null +++ b/fs/filesystem_config.md @@ -0,0 +1,29 @@ +## File System + +Create a `yaml` file under the `config` folder with the following parameters, let's assume `fs_sandbox.yaml`: + +```yaml +fs: + input_dir: !!str 'string' # Full path to a folder + required_exts: # list of required extensions + - !!str .md # just an example, include the "dot" ! + recursive: True|False (default) + delete_local_folder: True|False (default) + use_metadata_file: True|False (default) +saia: + base_url: !!str 'string' # GeneXus Enterprise AI Base URL + api_token: !!str 'string' + profile: !!str 'string' # Must match the RAG assistant ID + max_parallel_executions: !!int 5 + upload_operation_log: !!bool False|True (default) # Check operations LOG for detail if enabled +``` + +The process will read files from the file system `fs` section and upload them to the defined `Enterprise AI` endpoint detailed on the `saia` configuration. + +### Execution + +Example execution only considering files modified since `yesterday`: + +```bash +saia-cli ingest -c ./config/fs_sandbox.yaml --type fs --days 1 +``` diff --git a/fs/simple_folder_reader.py b/fs/simple_folder_reader.py new file mode 100644 index 0000000..d5fdf49 --- /dev/null +++ b/fs/simple_folder_reader.py @@ -0,0 +1,305 @@ +from datetime import datetime, timezone +from functools import reduce +from timeit import repeat +import warnings +import fsspec +from fsspec.implementations.local import LocalFileSystem +import multiprocessing +from pathlib import Path, PurePosixPath +from typing import Any, Callable, Dict, Generator, List, Optional, Set, Type, Union + +from llama_index import Document + +from saia_ingest.config import Defaults + + +class SimpleDirectoryReader(): + """ + Simple directory reader. + + Load files from file directory. + + Args: + input_dir (Union[Path, str]): Path to the directory. + exclude (List): glob of python file paths to exclude (Optional) + exclude_hidden (bool): Whether to exclude hidden files (dotfiles). + encoding (str): Encoding of the files, defaults toutf-8. + recursive (bool): Whether to recursively search in subdirectories, False by default. + required_exts (Optional[List[str]]): List of required extensions, defaults to None. + num_files_limit (Optional[int]): Maximum number of files to read, defaults to None. + fs (Optional[fsspec.AbstractFileSystem]): File system to use. Defaults + """ + + def __init__( + self, + input_dir: Optional[Union[Path, str]] = None, + exclude: Optional[List] = None, + exclude_hidden: bool = True, + recursive: bool = False, + encoding: str = "utf-8", + required_exts: Optional[List[str]] = None, + num_files_limit: Optional[int] = None, + fs: Optional[fsspec.AbstractFileSystem] = None, + timestamp: Optional[datetime] = None + ) -> None: + """Initialize with parameters.""" + + if not input_dir: + raise ValueError("Must provide `input_dir`.") + + self.fs = fs or get_default_fs() + self.encoding = encoding + self.saia_metadata_exclusion = [Defaults.PACKAGE_METADATA_POSTFIX] + + self.exclude = exclude if exclude is not None else self.saia_metadata_exclusion + self.recursive = recursive + self.exclude_hidden = exclude_hidden + self.required_exts = required_exts + self.num_files_limit = num_files_limit + self.timestamp = timestamp + + if input_dir: + if not self.fs.isdir(input_dir): + raise ValueError(f"Directory {input_dir} does not exist.") + self.input_dir = Path(input_dir) + self.exclude = exclude + self.input_files = self._add_files(self.input_dir) + + def is_hidden(self, path: Path) -> bool: + return any( + part.startswith(".") and part not in [".", ".."] for part in path.parts + ) + + def _add_files(self, input_dir: Path) -> List[Path]: + """Add files.""" + all_files: Set[Path] = set() + rejected_files: Set[Path] = set() + rejected_dirs: Set[Path] = set() + # Default to POSIX paths for non-default file systems (e.g. S3) + _Path = Path if is_default_fs(self.fs) else PurePosixPath + + if self.exclude is not None: + for excluded_pattern in self.exclude: + if self.recursive: + # Recursive glob + excluded_glob = _Path(input_dir) / _Path("**") / excluded_pattern + else: + # Non-recursive glob + excluded_glob = _Path(input_dir) / excluded_pattern + for file in self.fs.glob(str(excluded_glob)): + if self.fs.isdir(file): + rejected_dirs.add(_Path(file)) + else: + rejected_files.add(_Path(file)) + + file_refs: List[str] = [] + if self.recursive: + file_refs = self.fs.glob(str(input_dir) + "/**/*") + else: + file_refs = self.fs.glob(str(input_dir) + "/*") + + for _ref in file_refs: + ref = _Path(_ref) + is_dir = self.fs.isdir(ref) + skip_because_hidden = self.exclude_hidden and self.is_hidden(ref) + skip_because_bad_ext = ( + self.required_exts is not None and ref.suffix not in self.required_exts + ) + skip_because_excluded = ref in rejected_files + if not skip_because_excluded: + if is_dir: + ref_parent_dir = ref + else: + ref_parent_dir = self.fs._parent(ref) + for rejected_dir in rejected_dirs: + if str(ref_parent_dir).startswith(str(rejected_dir)): + skip_because_excluded = True + break + + if ( + is_dir + or skip_because_hidden + or skip_because_bad_ext + or skip_because_excluded + ): + continue + + if self.timestamp is not None: + file_modified_time = datetime.fromtimestamp(ref.stat().st_mtime).replace(tzinfo=timezone.utc) + if file_modified_time < self.timestamp: + continue + + all_files.add(ref) + + new_input_files = sorted(all_files) + + if self.num_files_limit is not None and self.num_files_limit > 0: + new_input_files = new_input_files[0 : self.num_files_limit] + + return new_input_files + + def _exclude_metadata(self, documents: List[Document]) -> List[Document]: + """ + Exclude metadata from documents. + + Args: + documents (List[Document]): List of documents. + """ + for doc in documents: + # Keep only metadata['file_path'] in both embedding and llm content + # str, which contain extreme important context that about the chunks. + # Dates is provided for convenience of postprocessor such as + # TimeWeightedPostprocessor, but excluded for embedding and LLMprompts + doc.excluded_embed_metadata_keys.extend( + [ + "file_name", + "file_type", + "file_size", + "creation_date", + "last_modified_date", + "last_accessed_date", + ] + ) + doc.excluded_llm_metadata_keys.extend( + [ + "file_name", + "file_type", + "file_size", + "creation_date", + "last_modified_date", + "last_accessed_date", + ] + ) + + return documents + + def list_resources(self, *args: Any, **kwargs: Any) -> List[str]: + """List files in the given filesystem.""" + return [str(x) for x in self.input_files] + + def load_resource( + self, resource_id: str, *args: Any, **kwargs: Any + ) -> List[Document]: + encoding = kwargs.get("encoding", self.encoding) + fs = kwargs.get("fs", self.fs) + + path_func = Path + + return SimpleDirectoryReader.load_file( + input_file=path_func(resource_id), + encoding=encoding, + fs=fs, + **kwargs, + ) + + def read_file_content(self, input_file: Path, **kwargs: Any) -> bytes: + """Read file content.""" + with self.fs.open(input_file, errors=self.errors, encoding=self.encoding) as f: + return f.read() + + @staticmethod + def load_file( + input_file: Path, + encoding: str = "utf-8", + errors: str = "ignore", + fs: Optional[fsspec.AbstractFileSystem] = None, + ) -> List[Document]: + """ + Static method for loading file. + + Returns: + List[Document]: loaded documents + """ + + metadata: Optional[dict] = None + documents: List[Document] = [] + + fs = fs or get_default_fs() + with fs.open(input_file, errors=errors, encoding=encoding) as f: + data = f.read().decode(encoding, errors=errors) + + doc = Document(text=data, metadata=metadata or {}) + + documents.append(doc) + + return documents + + def load_data( + self, + num_workers: Optional[int] = None, + fs: Optional[fsspec.AbstractFileSystem] = None, + ) -> List[Document]: + """ + Load data from the input directory. + + Args: + num_workers (Optional[int]): Number of workers to parallelize data-loading over. + fs (Optional[fsspec.AbstractFileSystem]): File system to use. If fs was specified + in the constructor, it will override the fs parameter here. + + Returns: + List[Document]: A list of documents. + """ + documents = [] + + files_to_process = self.input_files + fs = fs or self.fs + + if num_workers and num_workers > 1: + if num_workers > multiprocessing.cpu_count(): + warnings.warn( + "Specified num_workers exceed number of CPUs in the system. " + "Setting `num_workers` down to the maximum CPU count." + ) + with multiprocessing.get_context("spawn").Pool(num_workers) as p: + results = p.starmap( + SimpleDirectoryReader.load_file, + zip( + files_to_process, + repeat(self.encoding), + repeat(fs), + ), + ) + documents = reduce(lambda x, y: x + y, results) + + else: + for input_file in files_to_process: + documents.extend( + SimpleDirectoryReader.load_file( + input_file=input_file, + encoding=self.encoding, + fs=fs, + ) + ) + + return self._exclude_metadata(documents) + + def iter_data( + self + ) -> Generator[List[Document], Any, Any]: + """ + Load data iteratively from the input directory. + + Returns: + Generator[List[Document]]: A list of documents. + """ + files_to_process = self.input_files + + for input_file in files_to_process: + documents = SimpleDirectoryReader.load_file( + input_file=input_file, + encoding=self.encoding, + fs=self.fs, + ) + + documents = self._exclude_metadata(documents) + + if len(documents) > 0: + yield documents + + +def get_default_fs() -> fsspec.AbstractFileSystem: + return LocalFileSystem() + +def is_default_fs(fs: fsspec.AbstractFileSystem) -> bool: + return isinstance(fs, LocalFileSystem) and not fs.auto_mkdir diff --git a/saia_ingest/command_line/command_line.py b/saia_ingest/command_line/command_line.py index 4e1b075..02a9560 100644 --- a/saia_ingest/command_line/command_line.py +++ b/saia_ingest/command_line/command_line.py @@ -6,7 +6,7 @@ import logging from logging.handlers import RotatingFileHandler -from ..ingestor import ingest_s3, ingest_jira, ingest_confluence, ingest_github, ingest_gdrive, ingest_sharepoint +from ..ingestor import ingest_s3, ingest_jira, ingest_confluence, ingest_github, ingest_gdrive, ingest_sharepoint, ingest_file_system from ..log import AccumulatingLogHandler logging.basicConfig(level=logging.INFO) @@ -55,6 +55,8 @@ def handle_ingest( ret = ingest_github(config_file) elif type == "gdrive": ret = ingest_gdrive(config_file) + elif type == "fs": + ret = ingest_file_system(config_file, timestamp=timestamp) else: logging.getLogger().error(f"Unknown {type} type") return False diff --git a/saia_ingest/config.py b/saia_ingest/config.py index e01b752..0984313 100644 --- a/saia_ingest/config.py +++ b/saia_ingest/config.py @@ -56,3 +56,4 @@ class DefaultLLM: class Defaults: PACKAGE_DESCRIPTION = "GeneXus Enterprise AI" PACKAGE_URL = "https://github.com/genexuslabs/saia-ingest/blob/main/README.md" + PACKAGE_METADATA_POSTFIX = ".saia.metadata" diff --git a/saia_ingest/ingestor.py b/saia_ingest/ingestor.py index 27471e6..a84d80f 100644 --- a/saia_ingest/ingestor.py +++ b/saia_ingest/ingestor.py @@ -12,6 +12,7 @@ from langchain_community.vectorstores import Pinecone from langchain.text_splitter import RecursiveCharacterTextSplitter +from saia_ingest.config import Defaults from saia_ingest.profile_utils import is_valid_profile, file_upload, file_delete, operation_log_upload, sync_failed_files, search_failed_to_delete from saia_ingest.rag_api import RagApi from saia_ingest.utils import get_yaml_config, get_metadata_file, load_json_file, search_failed_files, find_value_by_key @@ -23,6 +24,7 @@ from gdrive.gdrive_reader import GoogleDriveReader from llama_hub.github_repo import GithubClient, GithubRepositoryReader +from fs.simple_folder_reader import SimpleDirectoryReader from saia_ingest.config import DefaultVectorStore @@ -131,7 +133,7 @@ def ingest_jira( lc_documents = all_documents if len(lc_documents) <= 0: - logging.getLogger().warn('No documents found') + logging.getLogger().warning('No documents found') return ret docs_file = save_to_file(lc_documents, prefix='jira') @@ -634,7 +636,7 @@ def ingest_gdrive( doc_count = len(file_paths) if doc_count <= 0: - logging.getLogger().warn('No documents found') + logging.getLogger().warning('No documents found') return ret path = os.path.dirname(file_paths[0]) @@ -682,6 +684,88 @@ def ingest_gdrive( finally: return ret +def ingest_file_system( + configuration: str, + timestamp: datetime = None, + ) -> bool: + ret = True + try: + start_time = time.time() + + config = get_yaml_config(configuration) + fs_level = config.get('fs', {}) + input_dir = fs_level.get('input_dir', None) + required_exts = fs_level.get('required_exts', None) + recursive = fs_level.get('recursive', False) + delete_local_folder = fs_level.get('delete_local_folder', False) + num_files_limit = fs_level.get('num_files_limit', None) + use_metadata_file = fs_level.get('use_metadata_file', False) + + # https://docs.llamaindex.ai/en/stable/examples/data_connectors/simple_directory_reader/ + loader = SimpleDirectoryReader( + input_dir=input_dir, + required_exts=required_exts, + recursive=recursive, + num_files_limit=num_files_limit, + timestamp=timestamp + ) + + doc_count = len(loader.input_files) + if doc_count <= 0: + logging.getLogger().warning('No documents found') + return ret + + logging.getLogger().info(f"Found {doc_count} files in {input_dir}") + + # Saia + saia_level = config.get('saia', {}) + saia_base_url = saia_level.get('base_url', None) + saia_api_token = saia_level.get('api_token', None) + saia_profile = saia_level.get('profile', None) + max_parallel_executions = saia_level.get('max_parallel_executions', 5) + upload_operation_log = saia_level.get('upload_operation_log', False) + + if saia_base_url is None: + logging.getLogger().error(f"Missing '{Defaults.PACKAGE_DESCRIPTION}' configuration") + logging.getLogger().error(f"Review configuration {Defaults.PACKAGE_URL}") + ret = False + return ret + + ret = is_valid_profile(saia_base_url, saia_api_token, saia_profile) + if ret is False: + logging.getLogger().error(f"Invalid profile {saia_profile}") + ret = False + return ret + + file_paths = os.path.dirname(loader.input_files[0]) + + ragApi = RagApi(saia_base_url, saia_api_token, saia_profile) + file_ids = [str(path) for path in loader.input_files] + saia_file_ids_to_delete = search_failed_to_delete(file_ids) + with concurrent.futures.ThreadPoolExecutor(max_workers=max_parallel_executions) as executor: + futures = [executor.submit(ragApi.delete_profile_document, id, saia_profile) for id in saia_file_ids_to_delete] + concurrent.futures.wait(futures) + + with concurrent.futures.ThreadPoolExecutor(max_workers=max_parallel_executions) as executor: + futures = [executor.submit(saia_file_upload, saia_base_url, saia_api_token, saia_profile, file_item, use_metadata_file) for file_item in loader.input_files] + concurrent.futures.wait(futures) + + if delete_local_folder and doc_count > 0: + shutil.rmtree(file_paths) + + if upload_operation_log: + end_time = time.time() + message_response = f"bulk ingest ({end_time - start_time:.2f}s)" + ret = operation_log_upload(saia_base_url, saia_api_token, saia_profile, "ALL", message_response, 0) + + return ret + + except Exception as e: + logging.getLogger().error(f"Error: {type(e)} {e}") + ret = False + finally: + return ret + def ingest_sharepoint( configuration: str, start_time: datetime, diff --git a/saia_ingest/profile_utils.py b/saia_ingest/profile_utils.py index b488ed0..ee6d223 100644 --- a/saia_ingest/profile_utils.py +++ b/saia_ingest/profile_utils.py @@ -6,7 +6,7 @@ import urllib3 import json from .log import AccumulatingLogHandler -from .config import DefaultHeaders +from .config import DefaultHeaders, Defaults # Suppress the InsecureRequestWarning urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -80,7 +80,7 @@ def file_upload( ret = False else: if save_answer: - with open(file_path + '.saia.metadata', 'w') as file: + with open(file_path + Defaults.PACKAGE_METADATA_POSTFIX, 'w') as file: file.write(json.dumps(response_body, indent=2)) end_time = time.time() metadata_elements = response_body.get('metadata', []) @@ -333,7 +333,7 @@ def search_failed_to_delete(files: list[str]) -> list[str]: """Check if local metadata exists and return a list of Document ids to delete""" file_list = [] for file in files: - item_file_metadata = f"{file}.saia.metadata" + item_file_metadata = f"{file}{Defaults.PACKAGE_METADATA_POSTFIX}" if os.path.exists(item_file_metadata): with open(item_file_metadata, 'r') as f: try: diff --git a/saia_ingest/utils.py b/saia_ingest/utils.py index e2d9bcc..38cecfb 100644 --- a/saia_ingest/utils.py +++ b/saia_ingest/utils.py @@ -5,6 +5,7 @@ import yaml import requests import chardet +from .config import Defaults def get_yaml_config(yaml_file): # Load the configuration from the YAML file @@ -68,7 +69,7 @@ def search_failed_files(directory, failed_status): file_list = [] for root, _, files in os.walk(directory): for file in files: - if file.endswith('.saia.metadata'): + if file.endswith(Defaults.PACKAGE_METADATA_POSTFIX): file_path = os.path.join(root, file) with open(file_path, 'r') as f: try: