From d27273cb2bf818ccbaeb461e4c84820c4f95a3c9 Mon Sep 17 00:00:00 2001 From: Stefano Lottini Date: Fri, 29 Mar 2024 03:30:09 +0100 Subject: [PATCH] logging, create_database signature (#271) --- astrapy/admin.py | 75 +++++++-- astrapy/client.py | 7 + astrapy/collection.py | 142 ++++++++++++++++-- astrapy/cursors.py | 12 ++ astrapy/database.py | 64 ++++++-- tests/idiomatic/integration/test_admin.py | 27 +++- tests/idiomatic/integration/test_ddl_async.py | 14 +- tests/idiomatic/integration/test_ddl_sync.py | 14 +- tests/idiomatic/integration/test_dml_async.py | 38 ++++- tests/idiomatic/integration/test_dml_sync.py | 38 ++++- 10 files changed, 373 insertions(+), 58 deletions(-) diff --git a/astrapy/admin.py b/astrapy/admin.py index 1b8fb9d2..5e05591c 100644 --- a/astrapy/admin.py +++ b/astrapy/admin.py @@ -14,6 +14,7 @@ from __future__ import annotations +import logging import re import time from abc import ABC, abstractmethod @@ -38,8 +39,9 @@ from astrapy import AsyncDatabase, Database -DEFAULT_NEW_DATABASE_CLOUD_PROVIDER = "gcp" -DEFAULT_NEW_DATABASE_REGION = "us-east1" +logger = logging.getLogger(__name__) + + DATABASE_POLL_NAMESPACE_SLEEP_TIME = 2 DATABASE_POLL_SLEEP_TIME = 15 @@ -425,6 +427,7 @@ def set_caller( ... ) """ + logger.info(f"setting caller to {caller_name}/{caller_version}") self._caller_name = caller_name self._caller_version = caller_version self._astra_db_ops.set_caller(caller_name, caller_version) @@ -458,9 +461,11 @@ def list_databases( 'eu-west-1' """ + logger.info("getting databases") gd_list_response = self._astra_db_ops.get_databases( timeout_info=base_timeout_info(max_time_ms) ) + logger.info("finished getting databases") if not isinstance(gd_list_response, list): raise DevOpsAPIException( "Faulty response from get-databases DevOps API command.", @@ -503,10 +508,12 @@ def database_info( 'eu-west-1' """ + logger.info(f"getting database info for {id}") gd_response = self._astra_db_ops.get_database( database=id, timeout_info=base_timeout_info(max_time_ms), ) + logger.info(f"finished getting database info for {id}") if not isinstance(gd_response, dict): raise DevOpsAPIException( "Faulty response from get-database DevOps API command.", @@ -522,9 +529,10 @@ def create_database( self, name: str, *, + cloud_provider: str, + region: str, + namespace: Optional[str] = None, wait_until_active: bool = True, - cloud_provider: str = DEFAULT_NEW_DATABASE_CLOUD_PROVIDER, - region: str = DEFAULT_NEW_DATABASE_REGION, max_time_ms: Optional[int] = None, ) -> AstraDBDatabaseAdmin: """ @@ -532,13 +540,15 @@ def create_database( Args: name: the desired name for the database. + namespace: name for the one namespace the database starts with. + If omitted, DevOps API will use its default. + cloud_provider: one of 'aws', 'gcp' or 'azure'. + region: any of the available cloud regions. wait_until_active: if True (default), the method returns only after the newly-created database is in ACTIVE state (a few minutes, usually). If False, it will return right after issuing the creation request to the DevOps API, and it will be responsibility of the caller to check the database status before working with it. - cloud_provider: one of 'aws', 'gcp' (default) or 'azure'. - region: any of the available cloud regions (default: 'us-east1'). max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the creation request @@ -559,25 +569,36 @@ def create_database( """ database_definition = { - "name": name, - "tier": "serverless", - "cloudProvider": cloud_provider, - "region": region, - "capacityUnits": 1, - "dbType": "vector", + k: v + for k, v in { + "name": name, + "tier": "serverless", + "cloudProvider": cloud_provider, + "region": region, + "capacityUnits": 1, + "dbType": "vector", + "keyspace": namespace, + }.items() + if v is not None } timeout_manager = MultiCallTimeoutManager( overall_max_time_ms=max_time_ms, exception_type="devops_api" ) + logger.info(f"creating database f{name}/(f{cloud_provider}, f{region})") cd_response = self._astra_db_ops.create_database( database_definition=database_definition, timeout_info=base_timeout_info(max_time_ms), ) + logger.info( + "devops api returned from creating database " + f"f{name}/(f{cloud_provider}, f{region})" + ) if cd_response is not None and "id" in cd_response: new_database_id = cd_response["id"] if wait_until_active: last_status_seen = STATUS_PENDING while last_status_seen in {STATUS_PENDING, STATUS_INITIALIZING}: + logger.info(f"sleeping to poll for status of '{new_database_id}'") time.sleep(DATABASE_POLL_SLEEP_TIME) last_status_seen = self.database_info( id=new_database_id, @@ -588,6 +609,10 @@ def create_database( f"Database {name} entered unexpected status {last_status_seen} after PENDING" ) # return the database instance + logger.info( + f"finished creating database '{new_database_id}' = " + f"f{name}/(f{cloud_provider}, f{region})" + ) return AstraDBDatabaseAdmin.from_astra_db_admin( id=new_database_id, astra_db_admin=self, @@ -638,15 +663,18 @@ def drop_database( timeout_manager = MultiCallTimeoutManager( overall_max_time_ms=max_time_ms, exception_type="devops_api" ) + logger.info(f"dropping database '{id}'") te_response = self._astra_db_ops.terminate_database( database=id, timeout_info=base_timeout_info(max_time_ms), ) + logger.info(f"devops api returned from dropping database '{id}'") if te_response == id: if wait_until_active: last_status_seen: Optional[str] = STATUS_TERMINATING _db_name: Optional[str] = None while last_status_seen == STATUS_TERMINATING: + logger.info(f"sleeping to poll for status of '{id}'") time.sleep(DATABASE_POLL_SLEEP_TIME) # detected_databases = [ @@ -666,6 +694,7 @@ def drop_database( raise DevOpsAPIException( f"Database {id}{_name_desc} entered unexpected status {last_status_seen} after PENDING" ) + logger.info(f"finished dropping database '{id}'") return {"ok": 1} else: raise DevOpsAPIException( @@ -1022,6 +1051,7 @@ def set_caller( ... ) """ + logger.info(f"setting caller to {caller_name}/{caller_version}") self._astra_db_admin.set_caller(caller_name, caller_version) @staticmethod @@ -1144,10 +1174,13 @@ def info(self, *, max_time_ms: Optional[int] = None) -> AdminDatabaseInfo: 'us-east1' """ - return self._astra_db_admin.database_info( # type: ignore[no-any-return] + logger.info(f"getting info ('{self.id}')") + req_response = self._astra_db_admin.database_info( id=self.id, max_time_ms=max_time_ms, ) + logger.info(f"finished getting info ('{self.id}')") + return req_response # type: ignore[no-any-return] def list_namespaces(self, *, max_time_ms: Optional[int] = None) -> List[str]: """ @@ -1164,7 +1197,9 @@ def list_namespaces(self, *, max_time_ms: Optional[int] = None) -> List[str]: ['default_keyspace', 'staging_namespace'] """ + logger.info(f"getting namespaces ('{self.id}')") info = self.info(max_time_ms=max_time_ms) + logger.info(f"finished getting namespaces ('{self.id}')") if info.raw_info is None: raise DevOpsAPIException("Could not get the namespace list.") else: @@ -1213,15 +1248,20 @@ def create_namespace( timeout_manager = MultiCallTimeoutManager( overall_max_time_ms=max_time_ms, exception_type="devops_api" ) + logger.info(f"creating namespace '{name}' on '{self.id}'") cn_response = self._astra_db_admin._astra_db_ops.create_keyspace( database=self.id, keyspace=name, timeout_info=base_timeout_info(max_time_ms), ) + logger.info( + f"devops api returned from creating namespace '{name}' on '{self.id}'" + ) if cn_response is not None and name == cn_response.get("name"): if wait_until_active: last_status_seen = STATUS_MAINTENANCE while last_status_seen == STATUS_MAINTENANCE: + logger.info(f"sleeping to poll for status of '{self.id}'") time.sleep(DATABASE_POLL_NAMESPACE_SLEEP_TIME) last_status_seen = self.info( max_time_ms=timeout_manager.remaining_timeout_ms(), @@ -1233,6 +1273,7 @@ def create_namespace( # is the namespace found? if name not in self.list_namespaces(): raise DevOpsAPIException("Could not create the namespace.") + logger.info(f"finished creating namespace '{name}' on '{self.id}'") return {"ok": 1} else: raise DevOpsAPIException( @@ -1281,15 +1322,20 @@ def drop_namespace( timeout_manager = MultiCallTimeoutManager( overall_max_time_ms=max_time_ms, exception_type="devops_api" ) + logger.info(f"dropping namespace '{name}' on '{self.id}'") dk_response = self._astra_db_admin._astra_db_ops.delete_keyspace( database=self.id, keyspace=name, timeout_info=base_timeout_info(max_time_ms), ) + logger.info( + f"devops api returned from dropping namespace '{name}' on '{self.id}'" + ) if dk_response == name: if wait_until_active: last_status_seen = STATUS_MAINTENANCE while last_status_seen == STATUS_MAINTENANCE: + logger.info(f"sleeping to poll for status of '{self.id}'") time.sleep(DATABASE_POLL_NAMESPACE_SLEEP_TIME) last_status_seen = self.info( max_time_ms=timeout_manager.remaining_timeout_ms(), @@ -1301,6 +1347,7 @@ def drop_namespace( # is the namespace found? if name in self.list_namespaces(): raise DevOpsAPIException("Could not drop the namespace.") + logger.info(f"finished dropping namespace '{name}' on '{self.id}'") return {"ok": 1} else: raise DevOpsAPIException( @@ -1350,11 +1397,13 @@ def drop( which avoids using a deceased database any further. """ + logger.info(f"dropping this database ('{self.id}')") return self._astra_db_admin.drop_database( # type: ignore[no-any-return] id=self.id, wait_until_active=wait_until_active, max_time_ms=max_time_ms, ) + logger.info(f"finished dropping this database ('{self.id}')") def get_database( self, diff --git a/astrapy/client.py b/astrapy/client.py index 6dfbf602..5d55f36e 100644 --- a/astrapy/client.py +++ b/astrapy/client.py @@ -14,6 +14,7 @@ from __future__ import annotations +import logging import re from typing import Any, Dict, Optional, TYPE_CHECKING @@ -32,6 +33,9 @@ from astrapy.admin import AstraDBAdmin +logger = logging.getLogger(__name__) + + class DataAPIClient: """ A client for using the Data API. This is the main entry point and sits @@ -180,6 +184,7 @@ def set_caller( >>> my_client.set_caller(caller_name="the_caller", caller_version="0.1.0") """ + logger.info(f"setting caller to {caller_name}/{caller_version}") self._caller_name = caller_name self._caller_version = caller_version @@ -243,12 +248,14 @@ def get_database( _region = region else: if this_db_info is None: + logger.info(f"fetching raw database info for {id}") this_db_info = fetch_raw_database_info_from_id_token( id=id, token=self.token, environment=self.environment, max_time_ms=max_time_ms, ) + logger.info(f"finished fetching raw database info for {id}") _region = this_db_info["info"]["region"] _token = token or self.token diff --git a/astrapy/collection.py b/astrapy/collection.py index daa689a2..3932f197 100644 --- a/astrapy/collection.py +++ b/astrapy/collection.py @@ -16,6 +16,7 @@ import asyncio import json +import logging from concurrent.futures import ThreadPoolExecutor from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, TYPE_CHECKING @@ -64,6 +65,9 @@ from astrapy.operations import AsyncBaseOperation, BaseOperation +logger = logging.getLogger(__name__) + + DEFAULT_INSERT_MANY_CONCURRENCY = 20 DEFAULT_BULK_WRITE_CONCURRENCY = 10 @@ -342,6 +346,7 @@ def set_caller( >>> my_coll.set_caller(caller_name="the_caller", caller_version="0.1.0") """ + logger.info(f"setting caller to {caller_name}/{caller_version}") self._astra_db_collection.set_caller( caller_name=caller_name, caller_version=caller_version, @@ -367,11 +372,13 @@ def options(self, *, max_time_ms: Optional[int] = None) -> CollectionOptions: CollectionOptions(vector=CollectionVectorOptions(dimension=3, metric='cosine')) """ + logger.info(f"getting collections in search of '{self.name}'") self_descriptors = [ coll_desc for coll_desc in self.database.list_collections(max_time_ms=max_time_ms) if coll_desc.name == self.name ] + logger.info(f"finished getting collections in search of '{self.name}'") if self_descriptors: return self_descriptors[0].options # type: ignore[no-any-return] else: @@ -511,10 +518,12 @@ def insert_one( """ _document = _collate_vector_to_document(document, vector) + logger.info(f"inserting one document in '{self.name}'") io_response = self._astra_db_collection.insert_one( _document, timeout_info=base_timeout_info(max_time_ms), ) + logger.info(f"finished inserting one document in '{self.name}'") if "insertedIds" in io_response.get("status", {}): if io_response["status"]["insertedIds"]: inserted_id = io_response["status"]["insertedIds"][0] @@ -648,18 +657,21 @@ def insert_many( else: _chunk_size = chunk_size _documents = _collate_vectors_to_documents(documents, vectors) + logger.info(f"inserting {len(_documents)} documents in '{self.name}'") raw_results: List[Dict[str, Any]] = [] timeout_manager = MultiCallTimeoutManager(overall_max_time_ms=max_time_ms) if ordered: options = {"ordered": True} inserted_ids: List[Any] = [] for i in range(0, len(_documents), _chunk_size): + logger.info(f"inserting a chunk of documents in '{self.name}'") chunk_response = self._astra_db_collection.insert_many( documents=_documents[i : i + _chunk_size], options=options, partial_failures_allowed=True, timeout_info=timeout_manager.remaining_timeout_info(), ) + logger.info(f"finished inserting a chunk of documents in '{self.name}'") # accumulate the results in this call chunk_inserted_ids = (chunk_response.get("status") or {}).get( "insertedIds", [] @@ -683,6 +695,9 @@ def insert_many( raw_results=raw_results, inserted_ids=inserted_ids, ) + logger.info( + f"finished inserting {len(_documents)} documents in '{self.name}'" + ) return full_result else: @@ -694,12 +709,17 @@ def insert_many( def _chunk_insertor( document_chunk: List[Dict[str, Any]] ) -> Dict[str, Any]: - return self._astra_db_collection.insert_many( + logger.info(f"inserting a chunk of documents in '{self.name}'") + im_response = self._astra_db_collection.insert_many( documents=document_chunk, options=options, partial_failures_allowed=True, timeout_info=timeout_manager.remaining_timeout_info(), ) + logger.info( + f"finished inserting a chunk of documents in '{self.name}'" + ) + return im_response raw_results = list( executor.map( @@ -711,15 +731,19 @@ def _chunk_insertor( ) ) else: - raw_results = [ - self._astra_db_collection.insert_many( - _documents[i : i + _chunk_size], - options=options, - partial_failures_allowed=True, - timeout_info=timeout_manager.remaining_timeout_info(), + for i in range(0, len(_documents), _chunk_size): + logger.info(f"inserting a chunk of documents in '{self.name}'") + raw_results.append( + self._astra_db_collection.insert_many( + _documents[i : i + _chunk_size], + options=options, + partial_failures_allowed=True, + timeout_info=timeout_manager.remaining_timeout_info(), + ) + ) + logger.info( + f"finished inserting a chunk of documents in '{self.name}'" ) - for i in range(0, len(_documents), _chunk_size) - ] # recast raw_results inserted_ids = [ inserted_id @@ -748,6 +772,9 @@ def _chunk_insertor( raw_results=raw_results, inserted_ids=inserted_ids, ) + logger.info( + f"finished inserting {len(_documents)} documents in '{self.name}'" + ) return full_result def find( @@ -1134,10 +1161,12 @@ def count_documents( by this method if this limit is encountered. """ + logger.info("calling count_documents") cd_response = self._astra_db_collection.count_documents( filter=filter, timeout_info=base_timeout_info(max_time_ms), ) + logger.info("finished calling count_documents") if "count" in cd_response.get("status", {}): count: int = cd_response["status"]["count"] if cd_response["status"].get("moreData", False): @@ -1255,6 +1284,7 @@ def find_one_and_replace( "returnDocument": return_document, "upsert": upsert, } + logger.info(f"calling find_one_and_replace on '{self.name}'") fo_response = self._astra_db_collection.find_one_and_replace( replacement=replacement, filter=filter, @@ -1263,6 +1293,7 @@ def find_one_and_replace( options=options, timeout_info=base_timeout_info(max_time_ms), ) + logger.info(f"finished calling find_one_and_replace on '{self.name}'") if "document" in fo_response.get("data", {}): ret_document = fo_response.get("data", {}).get("document") if ret_document is None: @@ -1336,6 +1367,7 @@ def replace_one( options = { "upsert": upsert, } + logger.info(f"calling find_one_and_replace on '{self.name}'") fo_response = self._astra_db_collection.find_one_and_replace( replacement=replacement, filter=filter, @@ -1343,6 +1375,7 @@ def replace_one( options=options, timeout_info=base_timeout_info(max_time_ms), ) + logger.info(f"finished calling find_one_and_replace on '{self.name}'") if "document" in fo_response.get("data", {}): fo_status = fo_response.get("status") or {} _update_info = _prepare_update_info([fo_status]) @@ -1458,6 +1491,7 @@ def find_one_and_update( "returnDocument": return_document, "upsert": upsert, } + logger.info(f"calling find_one_and_update on '{self.name}'") fo_response = self._astra_db_collection.find_one_and_update( update=update, filter=filter, @@ -1466,6 +1500,7 @@ def find_one_and_update( options=options, timeout_info=base_timeout_info(max_time_ms), ) + logger.info(f"finished calling find_one_and_update on '{self.name}'") if "document" in fo_response.get("data", {}): ret_document = fo_response.get("data", {}).get("document") if ret_document is None: @@ -1543,6 +1578,7 @@ def update_one( options = { "upsert": upsert, } + logger.info(f"calling find_one_and_update on '{self.name}'") fo_response = self._astra_db_collection.find_one_and_update( update=update, sort=_sort, @@ -1550,6 +1586,7 @@ def update_one( options=options, timeout_info=base_timeout_info(max_time_ms), ) + logger.info(f"finished calling find_one_and_update on '{self.name}'") if "document" in fo_response.get("data", {}): fo_status = fo_response.get("status") or {} _update_info = _prepare_update_info([fo_status]) @@ -1631,14 +1668,17 @@ def update_many( um_statuses: List[Dict[str, Any]] = [] must_proceed = True timeout_manager = MultiCallTimeoutManager(overall_max_time_ms=max_time_ms) + logger.info(f"starting update_many on '{self.name}'") while must_proceed: options = {**base_options, **page_state_options} + logger.info(f"calling update_many on '{self.name}'") this_um_response = self._astra_db_collection.update_many( update=update, filter=filter, options=options, timeout_info=timeout_manager.remaining_timeout_info(), ) + logger.info(f"finished calling update_many on '{self.name}'") this_um_status = this_um_response.get("status") or {} # # if errors, quit early @@ -1671,6 +1711,7 @@ def update_many( page_state_options = {} update_info = _prepare_update_info(um_statuses) + logger.info(f"finished update_many on '{self.name}'") return UpdateResult( raw_results=um_responses, update_info=update_info, @@ -1742,12 +1783,14 @@ def find_one_and_delete( _sort = _collate_vector_to_sort(sort, vector) _projection = normalize_optional_projection(projection) + logger.info(f"calling find_one_and_delete on '{self.name}'") fo_response = self._astra_db_collection.find_one_and_delete( sort=_sort, filter=filter, projection=_projection, timeout_info=base_timeout_info(max_time_ms), ) + logger.info(f"finished calling find_one_and_delete on '{self.name}'") if "document" in fo_response.get("data", {}): document = fo_response["data"]["document"] return document # type: ignore[no-any-return] @@ -1818,9 +1861,11 @@ def delete_one( """ _sort = _collate_vector_to_sort(sort, vector) + logger.info(f"calling delete_one_by_predicate on '{self.name}'") do_response = self._astra_db_collection.delete_one_by_predicate( filter=filter, timeout_info=base_timeout_info(max_time_ms), sort=_sort ) + logger.info(f"finished calling delete_one_by_predicate on '{self.name}'") if "deletedCount" in do_response.get("status", {}): deleted_count = do_response["status"]["deletedCount"] if deleted_count == -1: @@ -1895,12 +1940,15 @@ def delete_many( deleted_count = 0 must_proceed = True timeout_manager = MultiCallTimeoutManager(overall_max_time_ms=max_time_ms) + logger.info(f"starting delete_many on '{self.name}'") while must_proceed: + logger.info(f"calling delete_many on '{self.name}'") this_dm_response = self._astra_db_collection.delete_many( filter=filter, skip_error_check=True, timeout_info=timeout_manager.remaining_timeout_info(), ) + logger.info(f"finished calling delete_many on '{self.name}'") # if errors, quit early if this_dm_response.get("errors", []): partial_result = DeleteResult( @@ -1924,6 +1972,7 @@ def delete_many( deleted_count += this_dc must_proceed = this_dm_response.get("status", {}).get("moreData", False) + logger.info(f"finished delete_many on '{self.name}'") return DeleteResult( deleted_count=deleted_count, raw_results=dm_responses, @@ -1954,9 +2003,11 @@ def delete_all(self, *, max_time_ms: Optional[int] = None) -> Dict[str, Any]: Use with caution. """ + logger.info(f"calling unfiltered delete_many on '{self.name}'") dm_response = self._astra_db_collection.delete_many( filter={}, timeout_info=base_timeout_info(max_time_ms) ) + logger.info(f"finished calling unfiltered delete_many on '{self.name}'") deleted_count = dm_response["status"]["deletedCount"] if deleted_count == -1: return {"ok": 1} @@ -2030,6 +2081,7 @@ def bulk_write( if _concurrency > 1 and ordered: raise ValueError("Cannot run ordered bulk_write concurrently.") timeout_manager = MultiCallTimeoutManager(overall_max_time_ms=max_time_ms) + logger.info(f"startng a bulk write on '{self.name}'") if ordered: bulk_write_results: List[BulkWriteResult] = [] for operation_i, operation in enumerate(requests): @@ -2072,6 +2124,7 @@ def bulk_write( exceptions=[dar_exception], ) full_bw_result = reduce_bulk_write_results(bulk_write_results) + logger.info(f"finished a bulk write on '{self.name}'") return full_bw_result else: @@ -2134,6 +2187,7 @@ def _execute_as_either( exceptions=all_dar_exceptions, ) else: + logger.info(f"finished a bulk write on '{self.name}'") return reduce_bulk_write_results(bulk_write_successes) def drop(self, *, max_time_ms: Optional[int] = None) -> Dict[str, Any]: @@ -2178,7 +2232,10 @@ def drop(self, *, max_time_ms: Optional[int] = None) -> Dict[str, Any]: which avoids using a deceased collection any further. """ - return self.database.drop_collection(self, max_time_ms=max_time_ms) # type: ignore[no-any-return] + logger.info(f"dropping collection '{self.name}' (self)") + drop_result = self.database.drop_collection(self, max_time_ms=max_time_ms) + logger.info(f"finished dropping collection '{self.name}' (self)") + return drop_result # type: ignore[no-any-return] def command( self, @@ -2201,12 +2258,16 @@ def command( >>> my_coll.command({"countDocuments": {}}) {'status': {'count': 123}} """ - return self.database.command( # type: ignore[no-any-return] + + logger.info(f"calling command on '{self.name}'") + command_result = self.database.command( body=body, namespace=self.namespace, collection_name=self.name, max_time_ms=max_time_ms, ) + logger.info(f"finished calling command on '{self.name}'") + return command_result # type: ignore[no-any-return] class AsyncCollection: @@ -2405,6 +2466,7 @@ def set_caller( >>> my_coll.set_caller(caller_name="the_caller", caller_version="0.1.0") """ + logger.info(f"setting caller to {caller_name}/{caller_version}") self._astra_db_collection.set_caller( caller_name=caller_name, caller_version=caller_version, @@ -2430,6 +2492,7 @@ async def options(self, *, max_time_ms: Optional[int] = None) -> CollectionOptio CollectionOptions(vector=CollectionVectorOptions(dimension=3, metric='cosine')) """ + logger.info(f"getting collections in search of '{self.name}'") self_descriptors = [ coll_desc async for coll_desc in self.database.list_collections( @@ -2437,6 +2500,7 @@ async def options(self, *, max_time_ms: Optional[int] = None) -> CollectionOptio ) if coll_desc.name == self.name ] + logger.info(f"finished getting collections in search of '{self.name}'") if self_descriptors: return self_descriptors[0].options # type: ignore[no-any-return] else: @@ -2579,10 +2643,12 @@ async def insert_one( """ _document = _collate_vector_to_document(document, vector) + logger.info(f"inserting one document in '{self.name}'") io_response = await self._astra_db_collection.insert_one( _document, timeout_info=base_timeout_info(max_time_ms), ) + logger.info(f"finished inserting one document in '{self.name}'") if "insertedIds" in io_response.get("status", {}): if io_response["status"]["insertedIds"]: inserted_id = io_response["status"]["insertedIds"][0] @@ -2729,18 +2795,21 @@ async def insert_many( else: _chunk_size = chunk_size _documents = _collate_vectors_to_documents(documents, vectors) + logger.info(f"inserting {len(_documents)} documents in '{self.name}'") raw_results: List[Dict[str, Any]] = [] timeout_manager = MultiCallTimeoutManager(overall_max_time_ms=max_time_ms) if ordered: options = {"ordered": True} inserted_ids: List[Any] = [] for i in range(0, len(_documents), _chunk_size): + logger.info(f"inserting a chunk of documents in '{self.name}'") chunk_response = await self._astra_db_collection.insert_many( documents=_documents[i : i + _chunk_size], options=options, partial_failures_allowed=True, timeout_info=timeout_manager.remaining_timeout_info(), ) + logger.info(f"finished inserting a chunk of documents in '{self.name}'") # accumulate the results in this call chunk_inserted_ids = (chunk_response.get("status") or {}).get( "insertedIds", [] @@ -2764,6 +2833,9 @@ async def insert_many( raw_results=raw_results, inserted_ids=inserted_ids, ) + logger.info( + f"finished inserting {len(_documents)} documents in '{self.name}'" + ) return full_result else: @@ -2776,12 +2848,17 @@ async def concurrent_insert_chunk( document_chunk: List[DocumentType], ) -> Dict[str, Any]: async with sem: - return await self._astra_db_collection.insert_many( + logger.info(f"inserting a chunk of documents in '{self.name}'") + im_response = await self._astra_db_collection.insert_many( document_chunk, options=options, partial_failures_allowed=True, timeout_info=timeout_manager.remaining_timeout_info(), ) + logger.info( + f"finished inserting a chunk of documents in '{self.name}'" + ) + return im_response if _concurrency > 1: tasks = [ @@ -2825,6 +2902,9 @@ async def concurrent_insert_chunk( raw_results=raw_results, inserted_ids=inserted_ids, ) + logger.info( + f"finished inserting {len(_documents)} documents in '{self.name}'" + ) return full_result def find( @@ -3245,10 +3325,12 @@ async def count_documents( by this method if this limit is encountered. """ + logger.info("calling count_documents") cd_response = await self._astra_db_collection.count_documents( filter=filter, timeout_info=base_timeout_info(max_time_ms), ) + logger.info("finished calling count_documents") if "count" in cd_response.get("status", {}): count: int = cd_response["status"]["count"] if cd_response["status"].get("moreData", False): @@ -3372,6 +3454,7 @@ async def find_one_and_replace( "returnDocument": return_document, "upsert": upsert, } + logger.info(f"calling find_one_and_replace on '{self.name}'") fo_response = await self._astra_db_collection.find_one_and_replace( replacement=replacement, filter=filter, @@ -3380,6 +3463,7 @@ async def find_one_and_replace( options=options, timeout_info=base_timeout_info(max_time_ms), ) + logger.info(f"finished calling find_one_and_replace on '{self.name}'") if "document" in fo_response.get("data", {}): ret_document = fo_response.get("data", {}).get("document") if ret_document is None: @@ -3469,6 +3553,7 @@ async def replace_one( options = { "upsert": upsert, } + logger.info(f"calling find_one_and_replace on '{self.name}'") fo_response = await self._astra_db_collection.find_one_and_replace( replacement=replacement, filter=filter, @@ -3476,6 +3561,7 @@ async def replace_one( options=options, timeout_info=base_timeout_info(max_time_ms), ) + logger.info(f"finished calling find_one_and_replace on '{self.name}'") if "document" in fo_response.get("data", {}): fo_status = fo_response.get("status") or {} _update_info = _prepare_update_info([fo_status]) @@ -3597,6 +3683,7 @@ async def find_one_and_update( "returnDocument": return_document, "upsert": upsert, } + logger.info(f"calling find_one_and_update on '{self.name}'") fo_response = await self._astra_db_collection.find_one_and_update( update=update, filter=filter, @@ -3605,6 +3692,7 @@ async def find_one_and_update( options=options, timeout_info=base_timeout_info(max_time_ms), ) + logger.info(f"finished calling find_one_and_update on '{self.name}'") if "document" in fo_response.get("data", {}): ret_document = fo_response.get("data", {}).get("document") if ret_document is None: @@ -3697,6 +3785,7 @@ async def update_one( options = { "upsert": upsert, } + logger.info(f"calling find_one_and_update on '{self.name}'") fo_response = await self._astra_db_collection.find_one_and_update( update=update, sort=_sort, @@ -3704,6 +3793,7 @@ async def update_one( options=options, timeout_info=base_timeout_info(max_time_ms), ) + logger.info(f"finished calling find_one_and_update on '{self.name}'") if "document" in fo_response.get("data", {}): fo_status = fo_response.get("status") or {} _update_info = _prepare_update_info([fo_status]) @@ -3796,14 +3886,17 @@ async def update_many( um_statuses: List[Dict[str, Any]] = [] must_proceed = True timeout_manager = MultiCallTimeoutManager(overall_max_time_ms=max_time_ms) + logger.info(f"starting update_many on '{self.name}'") while must_proceed: options = {**base_options, **page_state_options} + logger.info(f"calling update_many on '{self.name}'") this_um_response = await self._astra_db_collection.update_many( update=update, filter=filter, options=options, timeout_info=timeout_manager.remaining_timeout_info(), ) + logger.info(f"finished calling update_many on '{self.name}'") this_um_status = this_um_response.get("status") or {} # # if errors, quit early @@ -3836,6 +3929,7 @@ async def update_many( page_state_options = {} update_info = _prepare_update_info(um_statuses) + logger.info(f"finished update_many on '{self.name}'") return UpdateResult( raw_results=um_responses, update_info=update_info, @@ -3913,12 +4007,14 @@ async def find_one_and_delete( _sort = _collate_vector_to_sort(sort, vector) _projection = normalize_optional_projection(projection) + logger.info(f"calling find_one_and_delete on '{self.name}'") fo_response = await self._astra_db_collection.find_one_and_delete( sort=_sort, filter=filter, projection=_projection, timeout_info=base_timeout_info(max_time_ms), ) + logger.info(f"finished calling find_one_and_delete on '{self.name}'") if "document" in fo_response.get("data", {}): document = fo_response["data"]["document"] return document # type: ignore[no-any-return] @@ -3989,11 +4085,13 @@ async def delete_one( """ _sort = _collate_vector_to_sort(sort, vector) + logger.info(f"calling delete_one_by_predicate on '{self.name}'") do_response = await self._astra_db_collection.delete_one_by_predicate( filter=filter, timeout_info=base_timeout_info(max_time_ms), sort=_sort, ) + logger.info(f"finished calling delete_one_by_predicate on '{self.name}'") if "deletedCount" in do_response.get("status", {}): deleted_count = do_response["status"]["deletedCount"] if deleted_count == -1: @@ -4073,12 +4171,15 @@ async def delete_many( deleted_count = 0 must_proceed = True timeout_manager = MultiCallTimeoutManager(overall_max_time_ms=max_time_ms) + logger.info(f"starting delete_many on '{self.name}'") while must_proceed: + logger.info(f"calling delete_many on '{self.name}'") this_dm_response = await self._astra_db_collection.delete_many( filter=filter, skip_error_check=True, timeout_info=timeout_manager.remaining_timeout_info(), ) + logger.info(f"finished calling delete_many on '{self.name}'") # if errors, quit early if this_dm_response.get("errors", []): partial_result = DeleteResult( @@ -4102,6 +4203,7 @@ async def delete_many( deleted_count += this_dc must_proceed = this_dm_response.get("status", {}).get("moreData", False) + logger.info(f"finished delete_many on '{self.name}'") return DeleteResult( deleted_count=deleted_count, raw_results=dm_responses, @@ -4146,9 +4248,11 @@ async def delete_all(self, *, max_time_ms: Optional[int] = None) -> Dict[str, An which avoids using a deceased collection any further. """ + logger.info(f"calling unfiltered delete_many on '{self.name}'") dm_response = await self._astra_db_collection.delete_many( filter={}, timeout_info=base_timeout_info(max_time_ms) ) + logger.info(f"finished calling unfiltered delete_many on '{self.name}'") deleted_count = dm_response["status"]["deletedCount"] if deleted_count == -1: return {"ok": 1} @@ -4238,6 +4342,7 @@ async def bulk_write( if _concurrency > 1 and ordered: raise ValueError("Cannot run ordered bulk_write concurrently.") timeout_manager = MultiCallTimeoutManager(overall_max_time_ms=max_time_ms) + logger.info(f"startng a bulk write on '{self.name}'") if ordered: bulk_write_results: List[BulkWriteResult] = [] for operation_i, operation in enumerate(requests): @@ -4280,6 +4385,7 @@ async def bulk_write( exceptions=[dar_exception], ) full_bw_result = reduce_bulk_write_results(bulk_write_results) + logger.info(f"finished a bulk write on '{self.name}'") return full_bw_result else: @@ -4335,6 +4441,7 @@ async def _concurrent_execute_as_either( exceptions=all_dar_exceptions, ) else: + logger.info(f"finished a bulk write on '{self.name}'") return reduce_bulk_write_results(bulk_write_successes) async def drop(self, *, max_time_ms: Optional[int] = None) -> Dict[str, Any]: @@ -4376,7 +4483,10 @@ async def drop(self, *, max_time_ms: Optional[int] = None) -> Dict[str, Any]: which avoids using a deceased collection any further. """ - return await self.database.drop_collection(self, max_time_ms=max_time_ms) # type: ignore[no-any-return] + logger.info(f"dropping collection '{self.name}' (self)") + drop_result = await self.database.drop_collection(self, max_time_ms=max_time_ms) + logger.info(f"finished dropping collection '{self.name}' (self)") + return drop_result # type: ignore[no-any-return] async def command( self, @@ -4399,9 +4509,13 @@ async def command( >>> asyncio.await(my_async_coll.command({"countDocuments": {}})) {'status': {'count': 123}} """ - return await self.database.command( # type: ignore[no-any-return] + + logger.info(f"calling command on '{self.name}'") + command_result = await self.database.command( body=body, namespace=self.namespace, collection_name=self.name, max_time_ms=max_time_ms, ) + logger.info(f"finished calling command on '{self.name}'") + return command_result # type: ignore[no-any-return] diff --git a/astrapy/cursors.py b/astrapy/cursors.py index 643532bb..be04914b 100644 --- a/astrapy/cursors.py +++ b/astrapy/cursors.py @@ -16,6 +16,7 @@ import hashlib import json +import logging import time from collections.abc import Iterator, AsyncIterator from typing import ( @@ -50,6 +51,9 @@ from astrapy.collection import AsyncCollection, Collection +logger = logging.getLogger(__name__) + + BC = TypeVar("BC", bound="BaseCursor") T = TypeVar("T") IndexPairType = Tuple[str, Optional[int]] @@ -574,6 +578,7 @@ def _create_iterator(self) -> Iterator[DocumentType]: else: pf_sort = None + logger.info(f"creating iterator on '{self._collection.name}'") iterator = self._collection._astra_db_collection.paginated_find( filter=self._filter, projection=pf_projection, @@ -582,6 +587,7 @@ def _create_iterator(self) -> Iterator[DocumentType]: prefetched=0, timeout_info=base_timeout_info(self._max_time_ms), ) + logger.info(f"finished creating iterator on '{self._collection.name}'") self._started_time_s = time.time() return iterator @@ -639,6 +645,7 @@ def distinct(self, key: str, max_time_ms: Optional[int] = None) -> List[Any]: started=False, overall_max_time_ms=max_time_ms, ) + logger.info(f"running distinct() on '{self._collection.name}'") for document in d_cursor: for item in _extractor(document): _item_hash = _hash_document(item) @@ -646,6 +653,7 @@ def distinct(self, key: str, max_time_ms: Optional[int] = None) -> List[Any]: _item_hashes.add(_item_hash) distinct_items.append(item) + logger.info(f"finished running distinct() on '{self._collection.name}'") return distinct_items @@ -778,6 +786,7 @@ def _create_iterator(self) -> AsyncIterator[DocumentType]: else: pf_sort = None + logger.info(f"creating iterator on '{self._collection.name}'") iterator = self._collection._astra_db_collection.paginated_find( filter=self._filter, projection=pf_projection, @@ -786,6 +795,7 @@ def _create_iterator(self) -> AsyncIterator[DocumentType]: prefetched=0, timeout_info=base_timeout_info(self._max_time_ms), ) + logger.info(f"finished creating iterator on '{self._collection.name}'") self._started_time_s = time.time() return iterator @@ -871,6 +881,7 @@ async def distinct(self, key: str, max_time_ms: Optional[int] = None) -> List[An started=False, overall_max_time_ms=max_time_ms, ) + logger.info(f"running distinct() on '{self._collection.name}'") async for document in d_cursor: for item in _extractor(document): _item_hash = _hash_document(item) @@ -878,6 +889,7 @@ async def distinct(self, key: str, max_time_ms: Optional[int] = None) -> List[An _item_hashes.add(_item_hash) distinct_items.append(item) + logger.info(f"finished running distinct() on '{self._collection.name}'") return distinct_items diff --git a/astrapy/database.py b/astrapy/database.py index c079c6f8..ad12b3a6 100644 --- a/astrapy/database.py +++ b/astrapy/database.py @@ -14,6 +14,8 @@ from __future__ import annotations +import logging + from types import TracebackType from typing import Any, Dict, List, Optional, Type, Union, TYPE_CHECKING @@ -35,6 +37,9 @@ from astrapy.admin import AstraDBDatabaseAdmin +logger = logging.getLogger(__name__) + + def _validate_create_collection_options( dimension: Optional[int] = None, metric: Optional[str] = None, @@ -270,6 +275,8 @@ def set_caller( Example: >>> my_db.set_caller(caller_name="the_caller", caller_version="0.1.0") """ + + logger.info(f"setting caller to {caller_name}/{caller_version}") self._astra_db.set_caller( caller_name=caller_name, caller_version=caller_version, @@ -295,12 +302,14 @@ def info(self) -> DatabaseInfo: between the `region` and the `raw_info["region"]` attributes. """ + logger.info("getting database info") database_info = fetch_database_info( self._astra_db.api_endpoint, token=self._astra_db.token, namespace=self.namespace, ) if database_info is not None: + logger.info("finished getting database info") return database_info else: raise DevOpsAPIException( @@ -480,6 +489,7 @@ def create_collection( _check_exists = check_exists existing_names: List[str] if _check_exists: + logger.info(f"checking collection existence for '{name}'") existing_names = self.list_collection_names( namespace=namespace, max_time_ms=max_time_ms ) @@ -493,6 +503,7 @@ def create_collection( collection_name=name, ) + logger.info(f"creating collection '{name}'") driver_db.create_collection( name, options=_options, @@ -500,6 +511,7 @@ def create_collection( metric=metric, timeout_info=base_timeout_info(max_time_ms), ) + logger.info(f"finished creating collection '{name}'") return self.get_collection(name, namespace=namespace) @recast_method_sync @@ -539,16 +551,20 @@ def drop_collection( if isinstance(name_or_collection, Collection): _namespace = name_or_collection.namespace _name: str = name_or_collection.name + logger.info(f"dropping collection '{_name}'") dc_response = self._astra_db.copy(namespace=_namespace).delete_collection( _name, timeout_info=base_timeout_info(max_time_ms), ) + logger.info(f"finished dropping collection '{_name}'") return dc_response.get("status", {}) # type: ignore[no-any-return] else: + logger.info(f"dropping collection '{name_or_collection}'") dc_response = self._astra_db.delete_collection( name_or_collection, timeout_info=base_timeout_info(max_time_ms), ) + logger.info(f"finished dropping collection '{name_or_collection}'") return dc_response.get("status", {}) # type: ignore[no-any-return] @recast_method_sync @@ -586,6 +602,7 @@ def list_collections( _client = self._astra_db.copy(namespace=namespace) else: _client = self._astra_db + logger.info("getting collections") gc_response = _client.get_collections( options={"explain": True}, timeout_info=base_timeout_info(max_time_ms) ) @@ -596,6 +613,7 @@ def list_collections( ) else: # we know this is a list of dicts, to marshal into "descriptors" + logger.info("finished getting collections") return CommandCursor( address=self._astra_db.base_url, items=[ @@ -627,11 +645,8 @@ def list_collection_names( ['a_collection', 'another_col'] """ - if namespace: - _client = self._astra_db.copy(namespace=namespace) - else: - _client = self._astra_db - gc_response = _client.get_collections( + logger.info("getting collection names") + gc_response = self._astra_db.copy(namespace=namespace).get_collections( timeout_info=base_timeout_info(max_time_ms) ) if "collections" not in gc_response.get("status", {}): @@ -641,6 +656,7 @@ def list_collection_names( ) else: # we know this is a list of strings + logger.info("finished getting collection names") return gc_response["status"]["collections"] # type: ignore[no-any-return] @recast_method_sync @@ -681,15 +697,23 @@ def command( _client = self._astra_db if collection_name: _collection = _client.collection(collection_name) - return _collection.post_raw_request( + logger.info(f"issuing custom command to API (on '{collection_name}')") + req_response = _collection.post_raw_request( body=body, timeout_info=base_timeout_info(max_time_ms), ) + logger.info( + f"finished issuing custom command to API (on '{collection_name}')" + ) + return req_response else: - return _client.post_raw_request( + logger.info("issuing custom command to API") + req_response = _client.post_raw_request( body=body, timeout_info=base_timeout_info(max_time_ms), ) + logger.info("finished issuing custom command to API") + return req_response def get_database_admin( self, @@ -954,6 +978,7 @@ def set_caller( >>> my_db.set_caller(caller_name="the_caller", caller_version="0.1.0") """ + logger.info(f"setting caller to {caller_name}/{caller_version}") self._astra_db.set_caller( caller_name=caller_name, caller_version=caller_version, @@ -979,12 +1004,14 @@ def info(self) -> DatabaseInfo: between the `region` and the `raw_info["region"]` attributes. """ + logger.info("getting database info") database_info = fetch_database_info( self._astra_db.api_endpoint, token=self._astra_db.token, namespace=self.namespace, ) if database_info is not None: + logger.info("finished getting database info") return database_info else: raise DevOpsAPIException( @@ -1171,6 +1198,7 @@ async def create_collection( _check_exists = check_exists existing_names: List[str] if _check_exists: + logger.info(f"checking collection existence for '{name}'") existing_names = await self.list_collection_names( namespace=namespace, max_time_ms=max_time_ms ) @@ -1184,6 +1212,7 @@ async def create_collection( collection_name=name, ) + logger.info(f"creating collection '{name}'") await driver_db.create_collection( name, options=_options, @@ -1191,6 +1220,7 @@ async def create_collection( metric=metric, timeout_info=base_timeout_info(max_time_ms), ) + logger.info(f"finished creating collection '{name}'") return await self.get_collection(name, namespace=namespace) @recast_method_async @@ -1230,18 +1260,22 @@ async def drop_collection( if isinstance(name_or_collection, AsyncCollection): _namespace = name_or_collection.namespace _name = name_or_collection.name + logger.info(f"dropping collection '{_name}'") dc_response = await self._astra_db.copy( namespace=_namespace ).delete_collection( _name, timeout_info=base_timeout_info(max_time_ms), ) + logger.info(f"finished dropping collection '{_name}'") return dc_response.get("status", {}) # type: ignore[no-any-return] else: + logger.info(f"dropping collection '{name_or_collection}'") dc_response = await self._astra_db.delete_collection( name_or_collection, timeout_info=base_timeout_info(max_time_ms), ) + logger.info(f"finished dropping collection '{name_or_collection}'") return dc_response.get("status", {}) # type: ignore[no-any-return] @recast_method_sync @@ -1282,6 +1316,7 @@ def list_collections( _client = self._astra_db.copy(namespace=namespace) else: _client = self._astra_db + logger.info("getting collections") gc_response = _client.to_sync().get_collections( options={"explain": True}, timeout_info=base_timeout_info(max_time_ms), @@ -1293,6 +1328,7 @@ def list_collections( ) else: # we know this is a list of dicts, to marshal into "descriptors" + logger.info("finished getting collections") return AsyncCommandCursor( address=self._astra_db.base_url, items=[ @@ -1324,6 +1360,7 @@ async def list_collection_names( ['a_collection', 'another_col'] """ + logger.info("getting collection names") gc_response = await self._astra_db.copy(namespace=namespace).get_collections( timeout_info=base_timeout_info(max_time_ms) ) @@ -1334,6 +1371,7 @@ async def list_collection_names( ) else: # we know this is a list of strings + logger.info("finished getting collection names") return gc_response["status"]["collections"] # type: ignore[no-any-return] @recast_method_async @@ -1377,15 +1415,23 @@ async def command( _client = self._astra_db if collection_name: _collection = await _client.collection(collection_name) - return await _collection.post_raw_request( + logger.info(f"issuing custom command to API (on '{collection_name}')") + req_response = await _collection.post_raw_request( body=body, timeout_info=base_timeout_info(max_time_ms), ) + logger.info( + f"finished issuing custom command to API (on '{collection_name}')" + ) + return req_response else: - return await _client.post_raw_request( + logger.info("issuing custom command to API") + req_response = await _client.post_raw_request( body=body, timeout_info=base_timeout_info(max_time_ms), ) + logger.info("finished issuing custom command to API") + return req_response def get_database_admin( self, diff --git a/tests/idiomatic/integration/test_admin.py b/tests/idiomatic/integration/test_admin.py index e75d41c8..70e46944 100644 --- a/tests/idiomatic/integration/test_admin.py +++ b/tests/idiomatic/integration/test_admin.py @@ -27,6 +27,8 @@ NAMESPACE_TIMEOUT = 30 DATABASE_POLL_SLEEP_TIME = 10 DATABASE_TIMEOUT = 480 +PRE_DROP_SAFETY_POLL_INTERVAL = 5 +PRE_DROP_SAFETY_TIMEOUT = 120 DO_IDIOMATIC_ADMIN_TESTS: bool @@ -107,6 +109,7 @@ def test_astra_db_database_admin(self, env_token: Tuple[str, str]) -> None: # create a db (wait) db_admin = admin.create_database( name=db_name, + namespace="custom_namespace", wait_until_active=True, cloud_provider=db_provider, region=db_region, @@ -118,7 +121,7 @@ def test_astra_db_database_admin(self, env_token: Tuple[str, str]) -> None: # list nss namespaces1 = set(db_admin.list_namespaces()) - assert namespaces1 == {"default_keyspace"} + assert namespaces1 == {"custom_namespace"} # create two namespaces w_create_ns_response = db_admin.create_namespace( @@ -171,7 +174,12 @@ def test_astra_db_database_admin(self, env_token: Tuple[str, str]) -> None: namespaces1b = set(db_admin.list_namespaces()) assert namespaces1b == namespaces1 - # drop db and check + # drop db and check. We wait a little due to "nontransactional cluster md" + wait_until_true( + poll_interval=PRE_DROP_SAFETY_POLL_INTERVAL, + max_seconds=PRE_DROP_SAFETY_TIMEOUT, + condition=lambda: db_admin.info().status == "ACTIVE", + ) db_drop_response = db_admin.drop() assert db_drop_response == {"ok": 1} @@ -273,8 +281,19 @@ def _waiter1() -> bool: # drop databases: the w one through the admin, the nw using its db-admin # (this covers most cases if combined with the # (w, using db-admin) of test_astra_db_database_admin) - db_nw_admin = admin.get_database_admin(created_db_id_nw) - drop_nw_response = db_nw_admin.drop(wait_until_active=False) + assert db_admin_nw == admin.get_database_admin(created_db_id_nw) + # drop db and check. We wait a little due to "nontransactional cluster md" + wait_until_true( + poll_interval=PRE_DROP_SAFETY_POLL_INTERVAL, + max_seconds=PRE_DROP_SAFETY_TIMEOUT, + condition=lambda: db_admin_nw.info().status == "ACTIVE", + ) + wait_until_true( + poll_interval=PRE_DROP_SAFETY_POLL_INTERVAL, + max_seconds=PRE_DROP_SAFETY_TIMEOUT, + condition=lambda: db_admin_w.info().status == "ACTIVE", + ) + drop_nw_response = db_admin_nw.drop(wait_until_active=False) assert drop_nw_response == {"ok": 1} drop_w_response = admin.drop_database(created_db_id_w) assert drop_w_response == {"ok": 1} diff --git a/tests/idiomatic/integration/test_ddl_async.py b/tests/idiomatic/integration/test_ddl_async.py index e619945d..264ed24b 100644 --- a/tests/idiomatic/integration/test_ddl_async.py +++ b/tests/idiomatic/integration/test_ddl_async.py @@ -90,7 +90,8 @@ async def test_collection_default_id_type_async( default_id_type=DefaultIdType.UUID, ) assert (await acol.options()).default_id.default_id_type == DefaultIdType.UUID - await acol.insert_one({"role": "probe"}) + i1res = await acol.insert_one({"role": "probe"}) + assert isinstance(i1res.inserted_id, UUID) doc = await acol.find_one({}) assert isinstance(doc["_id"], UUID) await acol.drop() @@ -101,7 +102,9 @@ async def test_collection_default_id_type_async( default_id_type=DefaultIdType.UUIDV6, ) assert (await acol.options()).default_id.default_id_type == DefaultIdType.UUIDV6 - await acol.insert_one({"role": "probe"}) + i1res = await acol.insert_one({"role": "probe"}) + assert isinstance(i1res.inserted_id, UUID) + assert i1res.inserted_id.version == 6 doc = await acol.find_one({}) assert isinstance(doc["_id"], UUID) assert doc["_id"].version == 6 @@ -113,7 +116,9 @@ async def test_collection_default_id_type_async( default_id_type=DefaultIdType.UUIDV7, ) assert (await acol.options()).default_id.default_id_type == DefaultIdType.UUIDV7 - await acol.insert_one({"role": "probe"}) + i1res = await acol.insert_one({"role": "probe"}) + assert isinstance(i1res.inserted_id, UUID) + assert i1res.inserted_id.version == 7 doc = await acol.find_one({}) assert isinstance(doc["_id"], UUID) assert doc["_id"].version == 7 @@ -137,7 +142,8 @@ async def test_collection_default_id_type_async( assert ( await acol.options() ).default_id.default_id_type == DefaultIdType.OBJECTID - await acol.insert_one({"role": "probe"}) + i1res = await acol.insert_one({"role": "probe"}) + assert isinstance(i1res.inserted_id, ObjectId) doc = await acol.find_one({}) assert isinstance(doc["_id"], ObjectId) await acol.drop() diff --git a/tests/idiomatic/integration/test_ddl_sync.py b/tests/idiomatic/integration/test_ddl_sync.py index 854698c7..6febe73a 100644 --- a/tests/idiomatic/integration/test_ddl_sync.py +++ b/tests/idiomatic/integration/test_ddl_sync.py @@ -90,7 +90,8 @@ def test_collection_default_id_type_sync( default_id_type=DefaultIdType.UUID, ) assert col.options().default_id.default_id_type == DefaultIdType.UUID - col.insert_one({"role": "probe"}) + i1res = col.insert_one({"role": "probe"}) + assert isinstance(i1res.inserted_id, UUID) doc = col.find_one({}) assert isinstance(doc["_id"], UUID) col.drop() @@ -101,7 +102,9 @@ def test_collection_default_id_type_sync( default_id_type=DefaultIdType.UUIDV6, ) assert col.options().default_id.default_id_type == DefaultIdType.UUIDV6 - col.insert_one({"role": "probe"}) + i1res = col.insert_one({"role": "probe"}) + assert isinstance(i1res.inserted_id, UUID) + assert i1res.inserted_id.version == 6 doc = col.find_one({}) assert isinstance(doc["_id"], UUID) assert doc["_id"].version == 6 @@ -113,7 +116,9 @@ def test_collection_default_id_type_sync( default_id_type=DefaultIdType.UUIDV7, ) assert col.options().default_id.default_id_type == DefaultIdType.UUIDV7 - col.insert_one({"role": "probe"}) + i1res = col.insert_one({"role": "probe"}) + assert isinstance(i1res.inserted_id, UUID) + assert i1res.inserted_id.version == 7 doc = col.find_one({}) assert isinstance(doc["_id"], UUID) assert doc["_id"].version == 7 @@ -133,7 +138,8 @@ def test_collection_default_id_type_sync( default_id_type=DefaultIdType.OBJECTID, ) assert col.options().default_id.default_id_type == DefaultIdType.OBJECTID - col.insert_one({"role": "probe"}) + i1res = col.insert_one({"role": "probe"}) + assert isinstance(i1res.inserted_id, ObjectId) doc = col.find_one({}) assert isinstance(doc["_id"], ObjectId) col.drop() diff --git a/tests/idiomatic/integration/test_dml_async.py b/tests/idiomatic/integration/test_dml_async.py index b4f28aa6..09985bb1 100644 --- a/tests/idiomatic/integration/test_dml_async.py +++ b/tests/idiomatic/integration/test_dml_async.py @@ -20,7 +20,7 @@ from astrapy import AsyncCollection from astrapy.results import DeleteResult, InsertOneResult -from astrapy.exceptions import InsertManyException +from astrapy.exceptions import InsertManyException, DataAPIResponseException from astrapy.constants import DocumentType, ReturnDocument, SortDocuments from astrapy.cursors import AsyncCursor from astrapy.operations import ( @@ -340,10 +340,24 @@ async def _alist(acursor: AsyncCursor) -> List[DocumentType]: ) # NONPAGINATED # case 1000 - # len(list(async_empty_collection.find(skip=Nski, limit=None, sort=None, filter=None))) + with pytest.raises(DataAPIResponseException): + len( + await _alist( + async_empty_collection.find( + skip=Nski, limit=None, sort=None, filter=None + ) + ) + ) # case 1001 - # len(list(async_empty_collection.find(skip=Nski, limit=None, sort=None, filter=Nfil))) + with pytest.raises(DataAPIResponseException): + len( + await _alist( + async_empty_collection.find( + skip=Nski, limit=None, sort=None, filter=Nfil + ) + ) + ) # case 1010 assert ( @@ -370,10 +384,24 @@ async def _alist(acursor: AsyncCursor) -> List[DocumentType]: ) # NONPAGINATED # case 1100 - # len(list(async_empty_collection.find(skip=Nski, limit=Nlim, sort=None, filter=None))) + with pytest.raises(DataAPIResponseException): + len( + await _alist( + async_empty_collection.find( + skip=Nski, limit=Nlim, sort=None, filter=None + ) + ) + ) # case 1101 - # len(list(async_empty_collection.find(skip=Nski, limit=Nlim, sort=None, filter=Nfil))) + with pytest.raises(DataAPIResponseException): + len( + await _alist( + async_empty_collection.find( + skip=Nski, limit=Nlim, sort=None, filter=Nfil + ) + ) + ) # case 1110 assert ( diff --git a/tests/idiomatic/integration/test_dml_sync.py b/tests/idiomatic/integration/test_dml_sync.py index 276acb96..dc9ab5de 100644 --- a/tests/idiomatic/integration/test_dml_sync.py +++ b/tests/idiomatic/integration/test_dml_sync.py @@ -19,7 +19,7 @@ from astrapy import Collection from astrapy.results import DeleteResult, InsertOneResult -from astrapy.exceptions import InsertManyException +from astrapy.exceptions import InsertManyException, DataAPIResponseException from astrapy.constants import ReturnDocument, SortDocuments from astrapy.operations import ( InsertOne, @@ -278,10 +278,24 @@ def test_collection_find_sync( ) # NONPAGINATED # case 1000 - # len(list(sync_empty_collection.find(skip=Nski, limit=None, sort=None, filter=None))) + with pytest.raises(DataAPIResponseException): + len( + list( + sync_empty_collection.find( + skip=Nski, limit=None, sort=None, filter=None + ) + ) + ) # case 1001 - # len(list(sync_empty_collection.find(skip=Nski, limit=None, sort=None, filter=Nfil))) + with pytest.raises(DataAPIResponseException): + len( + list( + sync_empty_collection.find( + skip=Nski, limit=None, sort=None, filter=Nfil + ) + ) + ) # case 1010 assert ( @@ -308,10 +322,24 @@ def test_collection_find_sync( ) # NONPAGINATED # case 1100 - # len(list(sync_empty_collection.find(skip=Nski, limit=Nlim, sort=None, filter=None))) + with pytest.raises(DataAPIResponseException): + len( + list( + sync_empty_collection.find( + skip=Nski, limit=Nlim, sort=None, filter=None + ) + ) + ) # case 1101 - # len(list(sync_empty_collection.find(skip=Nski, limit=Nlim, sort=None, filter=Nfil))) + with pytest.raises(DataAPIResponseException): + len( + list( + sync_empty_collection.find( + skip=Nski, limit=Nlim, sort=None, filter=Nfil + ) + ) + ) # case 1110 assert (