Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/detect file duplication #42

Merged
merged 6 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions amazon_s3/s3_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ s3: # contact the provider for the following information
reprocess_failed_files_reference: !!str 'string' # Full path to a file or URL to Saia GetDocuments API
reprocess_valid_status_list: # List of Statuses to process, valid values Unknown, Starting, Failed, Pending, Success
delete_local_folder: !!bool True|False (default) # Delete temporary folder if created
detect_file_duplication: !!bool True|False (default) # generate and compare a file hash on upload, discard duplications
excluded_exts: # list of excluded extensions, by default it is suggested to include the following: raw, metadata
- !!str 'metadata'
- !!str 'raw'
Expand Down
12 changes: 11 additions & 1 deletion amazon_s3/s3reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Union
from urllib.parse import quote, unquote
from saia_ingest.config import Defaults
from saia_ingest.utils import detect_file_extension
from saia_ingest.profile_utils import is_valid_profile, file_upload, file_delete, operation_log_upload, sync_failed_files, get_bearer_token, get_json_response_from_url
from saia_ingest.file_utils import calculate_file_hash

from llama_index import download_loader
from llama_index.readers.base import BaseReader
Expand Down Expand Up @@ -57,6 +59,7 @@ def __init__(
source_base_url: Optional[str] = None,
source_doc_id: Optional[str] = None,
alternative_document_service: Optional[Dict[str, str]] = None,
detect_file_duplication: Optional[bool] = False,
**kwargs: Any,
) -> None:
"""Initialize S3 bucket and key, along with credentials if needed.
Expand Down Expand Up @@ -118,6 +121,7 @@ def __init__(
self.alternative_document_service = alternative_document_service
self.source_base_url = source_base_url
self.source_doc_id = source_doc_id
self.detect_file_duplication = detect_file_duplication

self.s3 = None
self.s3_client = None
Expand Down Expand Up @@ -595,7 +599,7 @@ def rename_file(
s3_file = key_prefix + file_name
initial_metadata = self.get_metadata(s3_file)
if self.use_augment_metadata:
user_metadata = self.augment_metadata(file_name, initial_metadata, timestamp_tag)
user_metadata = self.augment_metadata(folder_path, file_name, initial_metadata, timestamp_tag)
extension_from_metadata = user_metadata.get(extension_tag, None)
if user_metadata:
self.write_object_to_file(user_metadata, metadata_file_path)
Expand Down Expand Up @@ -627,6 +631,7 @@ def rename_file(

def augment_metadata(
self,
folder_path: str,
document_name: str,
input_metadata: dict,
timestamp_tag: str = 'publishdate',
Expand Down Expand Up @@ -669,6 +674,11 @@ def augment_metadata(
initial_metadata[timestamp_tag] = formatted_date
initial_metadata['year'] = year

if self.detect_file_duplication:
file_path = f"{folder_path}/{document_name}"
file_hash = calculate_file_hash(file_path)
initial_metadata[Defaults.FILE_HASH] = file_hash

if self.source_base_url is not None and self.source_doc_id is not None:
if doc_url is not None:
initial_metadata['url'] = doc_url
Expand Down
1 change: 1 addition & 0 deletions saia_ingest/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,4 @@ class Defaults:
PACKAGE_DESCRIPTION = "GeneXus Enterprise AI"
PACKAGE_URL = "https://github.com/genexuslabs/saia-ingest/blob/main/README.md"
PACKAGE_METADATA_POSTFIX = ".saia.metadata"
FILE_HASH = "hash"
68 changes: 68 additions & 0 deletions saia_ingest/file_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import json
from pathlib import Path
from typing import Dict, Any
import hashlib
import logging

from saia_ingest.config import Defaults


def calculate_file_hash(filepath: Path) -> str:
"""Calculate the SHA-256 hash of a file."""
sha256 = hashlib.sha256()
try:
with open(filepath, 'rb') as f:
while chunk := f.read(4096):
sha256.update(chunk)
except FileNotFoundError:
logging.getLogger().error(f"File not found: {filepath}")
raise
except PermissionError:
logging.getLogger().error(f"Permission denied: {filepath}")
raise
return sha256.hexdigest()


def are_files_identical(file1: Path, file2: Path) -> bool:
"""Compare two files to see if they are identical by comparing their SHA-256 hashes."""
try:
return calculate_file_hash(file1) == calculate_file_hash(file2)
except Exception as e:
logging.getLogger().error(f"Error comparing files {file1} and {file2}: {e}")
return False


def load_hashes_from_json(folder: Path) -> Dict[str, Any]:
"""Load all existing hashes from JSON files in the folder."""
hash_index = {}
for json_file in folder.glob("*.json"):
try:
with json_file.open('r') as f:
data = json.load(f)
if Defaults.FILE_HASH in data:
file_hash = data[Defaults.FILE_HASH]
document_id = data["documentid"]
if file_hash in hash_index:
logging.getLogger().warning(f"{document_id} duplicate detected: using {hash_index[file_hash]}")
else:
hash_index[file_hash] = document_id
except (json.JSONDecodeError, FileNotFoundError) as e:
print(f"Error reading {json_file}: {e}")
return hash_index


def check_for_duplicates(new_file: Path, folder: Path) -> bool:
"""Check if the hash of a new file collides with existing hashes."""
# Load all existing hashes
hash_index = load_hashes_from_json(Path(folder))

# Calculate the hash of the new file
new_file_hash = calculate_file_hash(new_file)

if new_file_hash in hash_index:
print(f"Duplicate found! {new_file} matches {hash_index[new_file_hash]}")
return True
else:
print(f"No duplicates found for {new_file}.")
return False

15 changes: 15 additions & 0 deletions saia_ingest/ingestor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from pathlib import Path
import time
from datetime import datetime, timezone
import json
Expand All @@ -13,6 +14,7 @@
from langchain.text_splitter import RecursiveCharacterTextSplitter

from saia_ingest.config import Defaults
from saia_ingest.file_utils import calculate_file_hash, load_hashes_from_json
from saia_ingest.profile_utils import is_valid_profile, file_upload, file_delete, operation_log_upload, sync_failed_files, search_failed_to_delete
from saia_ingest.rag_api import RagApi
from saia_ingest.utils import get_yaml_config, get_metadata_file, load_json_file, search_failed_files, find_value_by_key
Expand Down Expand Up @@ -473,6 +475,7 @@ def ingest_s3(
download_dir = s3_level.get('download_dir', None)
verbose = s3_level.get('verbose', False)
delete_downloaded_files = s3_level.get('delete_downloaded_files', False)
detect_file_duplication = s3_level.get('detect_file_duplication', False)

# Saia
saia_level = config.get('saia', {})
Expand Down Expand Up @@ -510,6 +513,7 @@ def ingest_s3(
source_doc_id=source_doc_id,
alternative_document_service=alternative_document_service,
download_dir=download_dir,
detect_file_duplication=detect_file_duplication,
verbose=verbose
)
loader.init_s3()
Expand Down Expand Up @@ -541,6 +545,17 @@ def ingest_s3(
file_paths = loader.get_files() if loader.alternative_document_service is None else loader.get_files_from_url()

saia_file_ids_to_delete = search_failed_to_delete(file_paths)
if detect_file_duplication and len(file_paths) > 0:
hash_index = load_hashes_from_json(Path(download_dir))
for new_file in file_paths:
new_file_hash = calculate_file_hash(new_file)
if new_file_hash in hash_index:
document_id = hash_index[new_file_hash]
file_name = os.path.basename(new_file)
saia_file_ids_to_delete.append(hash_index[new_file_hash])
file_paths.remove(new_file)
logging.getLogger().warning(f"{file_name} duplicate discarded, using {document_id}")

with concurrent.futures.ThreadPoolExecutor(max_workers=max_parallel_executions) as executor:
futures = [executor.submit(ragApi.delete_profile_document, id, saia_profile) for id in saia_file_ids_to_delete]
concurrent.futures.wait(futures)
Expand Down
4 changes: 4 additions & 0 deletions saia_ingest/profile_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import requests
import urllib3
import json

from saia_ingest.file_utils import calculate_file_hash
from .log import AccumulatingLogHandler
from .config import DefaultHeaders, Defaults

Expand Down Expand Up @@ -80,6 +82,8 @@ def file_upload(
ret = False
else:
if save_answer:
file_crc = calculate_file_hash(file_path)
response_body[Defaults.FILE_HASH] = file_crc
with open(file_path + Defaults.PACKAGE_METADATA_POSTFIX, 'w') as file:
file.write(json.dumps(response_body, indent=2))
end_time = time.time()
Expand Down