Skip to content

Commit

Permalink
better handling of errors during insert many (#76)
Browse files Browse the repository at this point in the history
  • Loading branch information
hemidactylus authored Sep 20, 2024
1 parent 0d4220d commit 80ea19c
Showing 1 changed file with 32 additions and 14 deletions.
46 changes: 32 additions & 14 deletions libs/astradb/langchain_astradb/vectorstores.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@

# indexing options when creating a collection
DEFAULT_INDEXING_OPTIONS = {"allow": ["metadata"]}
# error code to check for during bulk insertions
DOCUMENT_ALREADY_EXISTS_API_ERROR_CODE = "DOCUMENT_ALREADY_EXISTS"

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -983,13 +985,21 @@ def add_texts(
ids_to_replace = []
inserted_ids = insert_many_result.inserted_ids
except InsertManyException as err:
inserted_ids = err.partial_result.inserted_ids
inserted_ids_set = set(inserted_ids)
ids_to_replace = [
document["_id"]
for document in documents_to_insert
if document["_id"] not in inserted_ids_set
]
# check that the error is solely due to already-existing documents
error_codes = {
getattr(err_desc, "error_code", None)
for err_desc in err.error_descriptors
}
if error_codes == {DOCUMENT_ALREADY_EXISTS_API_ERROR_CODE}:
inserted_ids = err.partial_result.inserted_ids
inserted_ids_set = set(inserted_ids)
ids_to_replace = [
document["_id"]
for document in documents_to_insert
if document["_id"] not in inserted_ids_set
]
else:
raise

# if necessary, replace docs for the non-inserted ids
if ids_to_replace:
Expand Down Expand Up @@ -1107,13 +1117,21 @@ async def aadd_texts(
ids_to_replace = []
inserted_ids = insert_many_result.inserted_ids
except InsertManyException as err:
inserted_ids = err.partial_result.inserted_ids
inserted_ids_set = set(inserted_ids)
ids_to_replace = [
document["_id"]
for document in documents_to_insert
if document["_id"] not in inserted_ids_set
]
# check that the error is solely due to already-existing documents
error_codes = {
getattr(err_desc, "error_code", None)
for err_desc in err.error_descriptors
}
if error_codes == {DOCUMENT_ALREADY_EXISTS_API_ERROR_CODE}:
inserted_ids = err.partial_result.inserted_ids
inserted_ids_set = set(inserted_ids)
ids_to_replace = [
document["_id"]
for document in documents_to_insert
if document["_id"] not in inserted_ids_set
]
else:
raise

# if necessary, replace docs for the non-inserted ids
if ids_to_replace:
Expand Down

0 comments on commit 80ea19c

Please sign in to comment.