Skip to content

Commit

Permalink
improve logic to support skip
Browse files Browse the repository at this point in the history
  • Loading branch information
anakin87 committed Nov 20, 2023
1 parent bb0b62c commit 8ff464c
Showing 1 changed file with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,17 +243,26 @@ def write_documents(self, documents: List[Document], policy: DuplicatePolicy = D

if errors:
duplicate_errors_ids = []
if policy == DuplicatePolicy.FAIL:
for e in errors:
if e["create"]["error"]["type"] == "version_conflict_engine_exception":
other_errors = []
for e in errors:
if e["create"]["error"]["type"] == "version_conflict_engine_exception":
if policy == DuplicatePolicy.FAIL:
duplicate_errors_ids.append(e["create"]["_id"])
elif policy == DuplicatePolicy.SKIP:
# when the policy is skip, these errors are OK and we should not raise an exception
continue
else:
other_errors.append(e)
else:
other_errors.append(e)

if len(duplicate_errors_ids) > 0:
msg = f"IDs '{', '.join(duplicate_errors_ids)}' already exist in the document store."
raise DuplicateDocumentError(msg)

msg = f"Failed to write documents to Elasticsearch. Errors:\n{errors}"
raise DocumentStoreError(msg)
if len(other_errors) > 0:
msg = f"Failed to write documents to Elasticsearch. Errors:\n{other_errors}"
raise DocumentStoreError(msg)

def _deserialize_document(self, hit: Dict[str, Any]) -> Document:
"""
Expand Down

0 comments on commit 8ff464c

Please sign in to comment.