Skip to content

Commit

Permalink
Feature/detect file duplication (#42)
Browse files Browse the repository at this point in the history
* add hash
* add file utils
* add detect_file_duplication s3 implementation
  • Loading branch information
ladrians authored Oct 23, 2024
1 parent 4708a30 commit 34e6427
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 1 deletion.
1 change: 1 addition & 0 deletions amazon_s3/s3_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ s3: # contact the provider for the following information
reprocess_failed_files_reference: !!str 'string' # Full path to a file or URL to Saia GetDocuments API
reprocess_valid_status_list: # List of Statuses to process, valid values Unknown, Starting, Failed, Pending, Success
delete_local_folder: !!bool True|False (default) # Delete temporary folder if created
detect_file_duplication: !!bool True|False (default) # generate and compare a file hash on upload, discard duplications
excluded_exts: # list of excluded extensions, by default it is suggested to include the following: raw, metadata
- !!str 'metadata'
- !!str 'raw'
Expand Down
12 changes: 11 additions & 1 deletion amazon_s3/s3reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Union
from urllib.parse import quote, unquote
from saia_ingest.config import Defaults
from saia_ingest.utils import detect_file_extension
from saia_ingest.profile_utils import is_valid_profile, file_upload, file_delete, operation_log_upload, sync_failed_files, get_bearer_token, get_json_response_from_url
from saia_ingest.file_utils import calculate_file_hash

from llama_index import download_loader
from llama_index.readers.base import BaseReader
Expand Down Expand Up @@ -57,6 +59,7 @@ def __init__(
source_base_url: Optional[str] = None,
source_doc_id: Optional[str] = None,
alternative_document_service: Optional[Dict[str, str]] = None,
detect_file_duplication: Optional[bool] = False,
**kwargs: Any,
) -> None:
"""Initialize S3 bucket and key, along with credentials if needed.
Expand Down Expand Up @@ -118,6 +121,7 @@ def __init__(
self.alternative_document_service = alternative_document_service
self.source_base_url = source_base_url
self.source_doc_id = source_doc_id
self.detect_file_duplication = detect_file_duplication

self.s3 = None
self.s3_client = None
Expand Down Expand Up @@ -595,7 +599,7 @@ def rename_file(
s3_file = key_prefix + file_name
initial_metadata = self.get_metadata(s3_file)
if self.use_augment_metadata:
user_metadata = self.augment_metadata(file_name, initial_metadata, timestamp_tag)
user_metadata = self.augment_metadata(folder_path, file_name, initial_metadata, timestamp_tag)
extension_from_metadata = user_metadata.get(extension_tag, None)
if user_metadata:
self.write_object_to_file(user_metadata, metadata_file_path)
Expand Down Expand Up @@ -627,6 +631,7 @@ def rename_file(

def augment_metadata(
self,
folder_path: str,
document_name: str,
input_metadata: dict,
timestamp_tag: str = 'publishdate',
Expand Down Expand Up @@ -669,6 +674,11 @@ def augment_metadata(
initial_metadata[timestamp_tag] = formatted_date
initial_metadata['year'] = year

if self.detect_file_duplication:
file_path = f"{folder_path}/{document_name}"
file_hash = calculate_file_hash(file_path)
initial_metadata[Defaults.FILE_HASH] = file_hash

if self.source_base_url is not None and self.source_doc_id is not None:
if doc_url is not None:
initial_metadata['url'] = doc_url
Expand Down
1 change: 1 addition & 0 deletions saia_ingest/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,4 @@ class Defaults:
PACKAGE_DESCRIPTION = "GeneXus Enterprise AI"
PACKAGE_URL = "https://github.com/genexuslabs/saia-ingest/blob/main/README.md"
PACKAGE_METADATA_POSTFIX = ".saia.metadata"
FILE_HASH = "hash"
68 changes: 68 additions & 0 deletions saia_ingest/file_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import json
from pathlib import Path
from typing import Dict, Any
import hashlib
import logging

from saia_ingest.config import Defaults


def calculate_file_hash(filepath: Path) -> str:
"""Calculate the SHA-256 hash of a file."""
sha256 = hashlib.sha256()
try:
with open(filepath, 'rb') as f:
while chunk := f.read(4096):
sha256.update(chunk)
except FileNotFoundError:
logging.getLogger().error(f"File not found: {filepath}")
raise
except PermissionError:
logging.getLogger().error(f"Permission denied: {filepath}")
raise
return sha256.hexdigest()


def are_files_identical(file1: Path, file2: Path) -> bool:
"""Compare two files to see if they are identical by comparing their SHA-256 hashes."""
try:
return calculate_file_hash(file1) == calculate_file_hash(file2)
except Exception as e:
logging.getLogger().error(f"Error comparing files {file1} and {file2}: {e}")
return False


def load_hashes_from_json(folder: Path) -> Dict[str, Any]:
"""Load all existing hashes from JSON files in the folder."""
hash_index = {}
for json_file in folder.glob("*.json"):
try:
with json_file.open('r') as f:
data = json.load(f)
if Defaults.FILE_HASH in data:
file_hash = data[Defaults.FILE_HASH]
document_id = data["documentid"]
if file_hash in hash_index:
logging.getLogger().warning(f"{document_id} duplicate detected: using {hash_index[file_hash]}")
else:
hash_index[file_hash] = document_id
except (json.JSONDecodeError, FileNotFoundError) as e:
print(f"Error reading {json_file}: {e}")
return hash_index


def check_for_duplicates(new_file: Path, folder: Path) -> bool:
"""Check if the hash of a new file collides with existing hashes."""
# Load all existing hashes
hash_index = load_hashes_from_json(Path(folder))

# Calculate the hash of the new file
new_file_hash = calculate_file_hash(new_file)

if new_file_hash in hash_index:
print(f"Duplicate found! {new_file} matches {hash_index[new_file_hash]}")
return True
else:
print(f"No duplicates found for {new_file}.")
return False

15 changes: 15 additions & 0 deletions saia_ingest/ingestor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from pathlib import Path
import time
from datetime import datetime, timezone
import json
Expand All @@ -13,6 +14,7 @@
from langchain.text_splitter import RecursiveCharacterTextSplitter

from saia_ingest.config import Defaults
from saia_ingest.file_utils import calculate_file_hash, load_hashes_from_json
from saia_ingest.profile_utils import is_valid_profile, file_upload, file_delete, operation_log_upload, sync_failed_files, search_failed_to_delete
from saia_ingest.rag_api import RagApi
from saia_ingest.utils import get_yaml_config, get_metadata_file, load_json_file, search_failed_files, find_value_by_key
Expand Down Expand Up @@ -473,6 +475,7 @@ def ingest_s3(
download_dir = s3_level.get('download_dir', None)
verbose = s3_level.get('verbose', False)
delete_downloaded_files = s3_level.get('delete_downloaded_files', False)
detect_file_duplication = s3_level.get('detect_file_duplication', False)

# Saia
saia_level = config.get('saia', {})
Expand Down Expand Up @@ -510,6 +513,7 @@ def ingest_s3(
source_doc_id=source_doc_id,
alternative_document_service=alternative_document_service,
download_dir=download_dir,
detect_file_duplication=detect_file_duplication,
verbose=verbose
)
loader.init_s3()
Expand Down Expand Up @@ -541,6 +545,17 @@ def ingest_s3(
file_paths = loader.get_files() if loader.alternative_document_service is None else loader.get_files_from_url()

saia_file_ids_to_delete = search_failed_to_delete(file_paths)
if detect_file_duplication and len(file_paths) > 0:
hash_index = load_hashes_from_json(Path(download_dir))
for new_file in file_paths:
new_file_hash = calculate_file_hash(new_file)
if new_file_hash in hash_index:
document_id = hash_index[new_file_hash]
file_name = os.path.basename(new_file)
saia_file_ids_to_delete.append(hash_index[new_file_hash])
file_paths.remove(new_file)
logging.getLogger().warning(f"{file_name} duplicate discarded, using {document_id}")

with concurrent.futures.ThreadPoolExecutor(max_workers=max_parallel_executions) as executor:
futures = [executor.submit(ragApi.delete_profile_document, id, saia_profile) for id in saia_file_ids_to_delete]
concurrent.futures.wait(futures)
Expand Down
4 changes: 4 additions & 0 deletions saia_ingest/profile_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import requests
import urllib3
import json

from saia_ingest.file_utils import calculate_file_hash
from .log import AccumulatingLogHandler
from .config import DefaultHeaders, Defaults

Expand Down Expand Up @@ -80,6 +82,8 @@ def file_upload(
ret = False
else:
if save_answer:
file_crc = calculate_file_hash(file_path)
response_body[Defaults.FILE_HASH] = file_crc
with open(file_path + Defaults.PACKAGE_METADATA_POSTFIX, 'w') as file:
file.write(json.dumps(response_body, indent=2))
end_time = time.time()
Expand Down

0 comments on commit 34e6427

Please sign in to comment.