Skip to content

Commit

Permalink
minor improvements; feedback from Christophe; agnostic language about…
Browse files Browse the repository at this point in the history
… 'thread'-related constants
  • Loading branch information
hemidactylus committed Jul 28, 2024
1 parent f92cd89 commit c0e1e0f
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 14 deletions.
10 changes: 5 additions & 5 deletions libs/astradb/langchain_astradb/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
from langchain_core.stores import BaseStore, ByteStore

from langchain_astradb.utils.astradb import (
INSERT_DOCUMENT_MAX_THREADS,
REPLACE_DOCUMENTS_MAX_THREADS,
MAX_CONCURRENT_DOCUMENT_INSERTIONS,
MAX_CONCURRENT_DOCUMENT_REPLACEMENTS,
SetupMode,
_AstraDBCollectionEnvironment,
)
Expand Down Expand Up @@ -94,7 +94,7 @@ def mset(self, key_value_pairs: Sequence[Tuple[str, V]]) -> None:
self.collection.insert_many(
documents_to_insert,
ordered=False,
concurrency=INSERT_DOCUMENT_MAX_THREADS,
concurrency=MAX_CONCURRENT_DOCUMENT_INSERTIONS,
)
ids_to_replace = []
except InsertManyException as err:
Expand All @@ -114,7 +114,7 @@ def mset(self, key_value_pairs: Sequence[Tuple[str, V]]) -> None:
]

with ThreadPoolExecutor(
max_workers=REPLACE_DOCUMENTS_MAX_THREADS
max_workers=MAX_CONCURRENT_DOCUMENT_REPLACEMENTS
) as executor:

def _replace_document(document: Dict[str, Any]) -> UpdateResult:
Expand Down Expand Up @@ -167,7 +167,7 @@ async def amset(self, key_value_pairs: Sequence[Tuple[str, V]]) -> None:
if document["_id"] in ids_to_replace
]

sem = asyncio.Semaphore(REPLACE_DOCUMENTS_MAX_THREADS)
sem = asyncio.Semaphore(MAX_CONCURRENT_DOCUMENT_REPLACEMENTS)

_async_collection = self.async_collection

Expand Down
6 changes: 3 additions & 3 deletions libs/astradb/langchain_astradb/utils/astradb.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
# Chunk size for many-document insertions (None meaning defer to astrapy):
DEFAULT_DOCUMENT_CHUNK_SIZE = None
# thread/coroutine count for bulk inserts
INSERT_DOCUMENT_MAX_THREADS = 20
MAX_CONCURRENT_DOCUMENT_INSERTIONS = 20
# Thread/coroutine count for one-doc-at-a-time overwrites
REPLACE_DOCUMENTS_MAX_THREADS = 20
MAX_CONCURRENT_DOCUMENT_REPLACEMENTS = 20
# Thread/coroutine count for one-doc-at-a-time deletes:
DELETE_DOCUMENTS_MAX_THREADS = 20
MAX_CONCURRENT_DOCUMENT_DELETIONS = 20

logger = logging.getLogger()

Expand Down
13 changes: 7 additions & 6 deletions libs/astradb/langchain_astradb/vectorstores.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@

from langchain_astradb.utils.astradb import (
DEFAULT_DOCUMENT_CHUNK_SIZE,
DELETE_DOCUMENTS_MAX_THREADS,
INSERT_DOCUMENT_MAX_THREADS,
REPLACE_DOCUMENTS_MAX_THREADS,
MAX_CONCURRENT_DOCUMENT_DELETIONS,
MAX_CONCURRENT_DOCUMENT_INSERTIONS,
MAX_CONCURRENT_DOCUMENT_REPLACEMENTS,
SetupMode,
_AstraDBCollectionEnvironment,
)
Expand Down Expand Up @@ -299,13 +299,13 @@ def __init__(
# Concurrency settings
self.batch_size: Optional[int] = batch_size or DEFAULT_DOCUMENT_CHUNK_SIZE
self.bulk_insert_batch_concurrency: int = (
bulk_insert_batch_concurrency or INSERT_DOCUMENT_MAX_THREADS
bulk_insert_batch_concurrency or MAX_CONCURRENT_DOCUMENT_INSERTIONS
)
self.bulk_insert_overwrite_concurrency: int = (
bulk_insert_overwrite_concurrency or REPLACE_DOCUMENTS_MAX_THREADS
bulk_insert_overwrite_concurrency or MAX_CONCURRENT_DOCUMENT_REPLACEMENTS
)
self.bulk_delete_concurrency: int = (
bulk_delete_concurrency or DELETE_DOCUMENTS_MAX_THREADS
bulk_delete_concurrency or MAX_CONCURRENT_DOCUMENT_DELETIONS
)
# "vector-related" settings
self.metric = metric
Expand Down Expand Up @@ -1626,6 +1626,7 @@ def from_texts(
Args:
texts: the texts to insert.
embedding: the embedding function to use in the store.
metadatas: metadata dicts for the texts.
ids: ids to associate to the texts.
**kwargs: you can pass any argument that you would
Expand Down

0 comments on commit c0e1e0f

Please sign in to comment.