From 34e6427ae04801824530754d9462793a171ad81d Mon Sep 17 00:00:00 2001 From: Luciano Silveira Date: Wed, 23 Oct 2024 10:21:00 -0300 Subject: [PATCH] Feature/detect file duplication (#42) * add hash * add file utils * add detect_file_duplication s3 implementation --- amazon_s3/s3_config.md | 1 + amazon_s3/s3reader.py | 12 ++++++- saia_ingest/config.py | 1 + saia_ingest/file_utils.py | 68 ++++++++++++++++++++++++++++++++++++ saia_ingest/ingestor.py | 15 ++++++++ saia_ingest/profile_utils.py | 4 +++ 6 files changed, 100 insertions(+), 1 deletion(-) create mode 100644 saia_ingest/file_utils.py diff --git a/amazon_s3/s3_config.md b/amazon_s3/s3_config.md index eee9860..fe15709 100644 --- a/amazon_s3/s3_config.md +++ b/amazon_s3/s3_config.md @@ -23,6 +23,7 @@ s3: # contact the provider for the following information reprocess_failed_files_reference: !!str 'string' # Full path to a file or URL to Saia GetDocuments API reprocess_valid_status_list: # List of Statuses to process, valid values Unknown, Starting, Failed, Pending, Success delete_local_folder: !!bool True|False (default) # Delete temporary folder if created + detect_file_duplication: !!bool True|False (default) # generate and compare a file hash on upload, discard duplications excluded_exts: # list of excluded extensions, by default it is suggested to include the following: raw, metadata - !!str 'metadata' - !!str 'raw' diff --git a/amazon_s3/s3reader.py b/amazon_s3/s3reader.py index 9808946..8a9f99a 100644 --- a/amazon_s3/s3reader.py +++ b/amazon_s3/s3reader.py @@ -11,8 +11,10 @@ from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Union from urllib.parse import quote, unquote +from saia_ingest.config import Defaults from saia_ingest.utils import detect_file_extension from saia_ingest.profile_utils import is_valid_profile, file_upload, file_delete, operation_log_upload, sync_failed_files, get_bearer_token, get_json_response_from_url +from saia_ingest.file_utils import calculate_file_hash from llama_index import download_loader from llama_index.readers.base import BaseReader @@ -57,6 +59,7 @@ def __init__( source_base_url: Optional[str] = None, source_doc_id: Optional[str] = None, alternative_document_service: Optional[Dict[str, str]] = None, + detect_file_duplication: Optional[bool] = False, **kwargs: Any, ) -> None: """Initialize S3 bucket and key, along with credentials if needed. @@ -118,6 +121,7 @@ def __init__( self.alternative_document_service = alternative_document_service self.source_base_url = source_base_url self.source_doc_id = source_doc_id + self.detect_file_duplication = detect_file_duplication self.s3 = None self.s3_client = None @@ -595,7 +599,7 @@ def rename_file( s3_file = key_prefix + file_name initial_metadata = self.get_metadata(s3_file) if self.use_augment_metadata: - user_metadata = self.augment_metadata(file_name, initial_metadata, timestamp_tag) + user_metadata = self.augment_metadata(folder_path, file_name, initial_metadata, timestamp_tag) extension_from_metadata = user_metadata.get(extension_tag, None) if user_metadata: self.write_object_to_file(user_metadata, metadata_file_path) @@ -627,6 +631,7 @@ def rename_file( def augment_metadata( self, + folder_path: str, document_name: str, input_metadata: dict, timestamp_tag: str = 'publishdate', @@ -669,6 +674,11 @@ def augment_metadata( initial_metadata[timestamp_tag] = formatted_date initial_metadata['year'] = year + if self.detect_file_duplication: + file_path = f"{folder_path}/{document_name}" + file_hash = calculate_file_hash(file_path) + initial_metadata[Defaults.FILE_HASH] = file_hash + if self.source_base_url is not None and self.source_doc_id is not None: if doc_url is not None: initial_metadata['url'] = doc_url diff --git a/saia_ingest/config.py b/saia_ingest/config.py index 0984313..b24e5cb 100644 --- a/saia_ingest/config.py +++ b/saia_ingest/config.py @@ -57,3 +57,4 @@ class Defaults: PACKAGE_DESCRIPTION = "GeneXus Enterprise AI" PACKAGE_URL = "https://github.com/genexuslabs/saia-ingest/blob/main/README.md" PACKAGE_METADATA_POSTFIX = ".saia.metadata" + FILE_HASH = "hash" diff --git a/saia_ingest/file_utils.py b/saia_ingest/file_utils.py new file mode 100644 index 0000000..81fca7e --- /dev/null +++ b/saia_ingest/file_utils.py @@ -0,0 +1,68 @@ +import json +from pathlib import Path +from typing import Dict, Any +import hashlib +import logging + +from saia_ingest.config import Defaults + + +def calculate_file_hash(filepath: Path) -> str: + """Calculate the SHA-256 hash of a file.""" + sha256 = hashlib.sha256() + try: + with open(filepath, 'rb') as f: + while chunk := f.read(4096): + sha256.update(chunk) + except FileNotFoundError: + logging.getLogger().error(f"File not found: {filepath}") + raise + except PermissionError: + logging.getLogger().error(f"Permission denied: {filepath}") + raise + return sha256.hexdigest() + + +def are_files_identical(file1: Path, file2: Path) -> bool: + """Compare two files to see if they are identical by comparing their SHA-256 hashes.""" + try: + return calculate_file_hash(file1) == calculate_file_hash(file2) + except Exception as e: + logging.getLogger().error(f"Error comparing files {file1} and {file2}: {e}") + return False + + +def load_hashes_from_json(folder: Path) -> Dict[str, Any]: + """Load all existing hashes from JSON files in the folder.""" + hash_index = {} + for json_file in folder.glob("*.json"): + try: + with json_file.open('r') as f: + data = json.load(f) + if Defaults.FILE_HASH in data: + file_hash = data[Defaults.FILE_HASH] + document_id = data["documentid"] + if file_hash in hash_index: + logging.getLogger().warning(f"{document_id} duplicate detected: using {hash_index[file_hash]}") + else: + hash_index[file_hash] = document_id + except (json.JSONDecodeError, FileNotFoundError) as e: + print(f"Error reading {json_file}: {e}") + return hash_index + + +def check_for_duplicates(new_file: Path, folder: Path) -> bool: + """Check if the hash of a new file collides with existing hashes.""" + # Load all existing hashes + hash_index = load_hashes_from_json(Path(folder)) + + # Calculate the hash of the new file + new_file_hash = calculate_file_hash(new_file) + + if new_file_hash in hash_index: + print(f"Duplicate found! {new_file} matches {hash_index[new_file_hash]}") + return True + else: + print(f"No duplicates found for {new_file}.") + return False + diff --git a/saia_ingest/ingestor.py b/saia_ingest/ingestor.py index a84d80f..a276435 100644 --- a/saia_ingest/ingestor.py +++ b/saia_ingest/ingestor.py @@ -1,4 +1,5 @@ import os +from pathlib import Path import time from datetime import datetime, timezone import json @@ -13,6 +14,7 @@ from langchain.text_splitter import RecursiveCharacterTextSplitter from saia_ingest.config import Defaults +from saia_ingest.file_utils import calculate_file_hash, load_hashes_from_json from saia_ingest.profile_utils import is_valid_profile, file_upload, file_delete, operation_log_upload, sync_failed_files, search_failed_to_delete from saia_ingest.rag_api import RagApi from saia_ingest.utils import get_yaml_config, get_metadata_file, load_json_file, search_failed_files, find_value_by_key @@ -473,6 +475,7 @@ def ingest_s3( download_dir = s3_level.get('download_dir', None) verbose = s3_level.get('verbose', False) delete_downloaded_files = s3_level.get('delete_downloaded_files', False) + detect_file_duplication = s3_level.get('detect_file_duplication', False) # Saia saia_level = config.get('saia', {}) @@ -510,6 +513,7 @@ def ingest_s3( source_doc_id=source_doc_id, alternative_document_service=alternative_document_service, download_dir=download_dir, + detect_file_duplication=detect_file_duplication, verbose=verbose ) loader.init_s3() @@ -541,6 +545,17 @@ def ingest_s3( file_paths = loader.get_files() if loader.alternative_document_service is None else loader.get_files_from_url() saia_file_ids_to_delete = search_failed_to_delete(file_paths) + if detect_file_duplication and len(file_paths) > 0: + hash_index = load_hashes_from_json(Path(download_dir)) + for new_file in file_paths: + new_file_hash = calculate_file_hash(new_file) + if new_file_hash in hash_index: + document_id = hash_index[new_file_hash] + file_name = os.path.basename(new_file) + saia_file_ids_to_delete.append(hash_index[new_file_hash]) + file_paths.remove(new_file) + logging.getLogger().warning(f"{file_name} duplicate discarded, using {document_id}") + with concurrent.futures.ThreadPoolExecutor(max_workers=max_parallel_executions) as executor: futures = [executor.submit(ragApi.delete_profile_document, id, saia_profile) for id in saia_file_ids_to_delete] concurrent.futures.wait(futures) diff --git a/saia_ingest/profile_utils.py b/saia_ingest/profile_utils.py index ee6d223..d7e35c3 100644 --- a/saia_ingest/profile_utils.py +++ b/saia_ingest/profile_utils.py @@ -5,6 +5,8 @@ import requests import urllib3 import json + +from saia_ingest.file_utils import calculate_file_hash from .log import AccumulatingLogHandler from .config import DefaultHeaders, Defaults @@ -80,6 +82,8 @@ def file_upload( ret = False else: if save_answer: + file_crc = calculate_file_hash(file_path) + response_body[Defaults.FILE_HASH] = file_crc with open(file_path + Defaults.PACKAGE_METADATA_POSTFIX, 'w') as file: file.write(json.dumps(response_body, indent=2)) end_time = time.time()