Skip to content

Commit

Permalink
Ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
docuracy committed Jan 25, 2025
1 parent 580b03d commit 4530704
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 3 deletions.
6 changes: 3 additions & 3 deletions vespa/repository/api/ingestion/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from .triples import feed_triple, process_variants
from ..bcp_47.bcp_47 import bcp47_fields
from ..config import VespaClient
from ..utils import task_tracker, get_uuid, escape_yql
from ..utils import task_tracker, get_uuid, escape_yql, distinct_dicts

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -52,7 +52,7 @@ def queue_worker():
time.sleep(3 * 60)

except Exception as e:
logger.error(f"Error processing update: {e}")
logger.error(f"Error processing update: {e}", exc_info=True)

finally:
update_queue.task_done()
Expand All @@ -75,7 +75,7 @@ def update_existing_place(task):
schema=schema,
data_id=document_id,
fields={
"names": list(set(existing_names + transformed_document['fields']['names']))
"names": distinct_dicts(existing_names, transformed_document['fields']['names'])
}
)
# logger.info(f"Update response: {response.get('status_code')}: {response}")
Expand Down
40 changes: 40 additions & 0 deletions vespa/repository/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,43 @@ def existing_document(query_response_root):
'document_id': document.get("documentid").split("::")[-1],
'fields': document,
}

def distinct_dicts(existing_dicts, new_dicts):
"""
Deduplicates two lists of dictionaries based on all fields.
Args:
existing_dicts (list or dict): List of existing dictionaries or a single dictionary.
new_dicts (list or dict): List of new dictionaries or a single dictionary to be merged.
Returns:
list: A list of unique dictionaries, with duplicates removed based on all fields.
"""
# Ensure both inputs are lists (wrap single dicts as lists)
if isinstance(existing_dicts, dict):
existing_dicts = [existing_dicts]
if isinstance(new_dicts, dict):
new_dicts = [new_dicts]

# Ensure both inputs are lists now
if not isinstance(existing_dicts, list) or not isinstance(new_dicts, list):
raise ValueError("Both existing_dicts and new_dicts must be lists or single dictionaries.")

# Combine the existing and new dictionaries
combined_dicts = existing_dicts + new_dicts

# Set to track unique dictionaries (converted to frozensets for hashing)
seen = set()
unique_dicts = []

for dictionary in combined_dicts:
# Convert each dictionary to a frozenset of its items (making it hashable)
dict_frozenset = frozenset(dictionary.items())

# If the frozenset has not been seen, add to the result
if dict_frozenset not in seen:
unique_dicts.append(dictionary)
seen.add(dict_frozenset)

return unique_dicts

0 comments on commit 4530704

Please sign in to comment.