diff --git a/lib/pbench/server/api/resources/query_apis/__init__.py b/lib/pbench/server/api/resources/query_apis/__init__.py index 0ec31c2b1b..9cac23a5e9 100644 --- a/lib/pbench/server/api/resources/query_apis/__init__.py +++ b/lib/pbench/server/api/resources/query_apis/__init__.py @@ -25,24 +25,23 @@ ApiMethod, ApiParams, ApiSchema, - MissingParameters, ParamType, SchemaError, UnauthorizedAccess, - UnauthorizedAdminAccess, ) import pbench.server.auth.auth as Auth -from pbench.server.database.database import Database from pbench.server.database.models.audit import AuditReason, AuditStatus from pbench.server.database.models.datasets import ( Dataset, Metadata, Operation, + OperationName, OperationState, ) from pbench.server.database.models.index_map import IndexMap, IndexStream from pbench.server.database.models.templates import Template from pbench.server.database.models.users import User +from pbench.server.sync import Sync class MissingBulkSchemaParameters(SchemaError): @@ -568,9 +567,49 @@ def __init__( self.schemas[ApiMethod.POST].authorization == ApiAuthorizationType.DATASET ), f"API {self.__class__.__name__} authorization type must be DATASET" + def expect_index(self, dataset: Dataset) -> bool: + """Are we waiting for an index map? + + If a dataset doesn't have an index map, and we require one, we need to + know whether we should expect one in the future. If not, we can usually + ignore the requirement (which is to be sure we don't strand the + Elasticsearch documents). + + We don't expect an index map if: + + 1) If the dataset is marked with "server.archiveonly", we won't attempt + to create an index; + 2) If we attempted to index the dataset, but failed, we'd like to be + able to publish (or delete) the dataset anyway. + + Args: + dataset: a Dataset object + + Returns: + True if we should expect an index to appear, or False if not + """ + archive_only = Metadata.getvalue(dataset, Metadata.SERVER_ARCHIVE) + if archive_only: + return False + index_state = Operation.by_operation(dataset, OperationName.INDEX) + if index_state and index_state.state is OperationState.FAILED: + return False + return True + + def prepare(self, params: ApiParams, dataset: Dataset, context: ApiContext): + """Prepare for the bulk operation + + This is an empty abstract method that can be overridden by a subclass. + + Args: + params: Type-normalized client request body JSON + dataset: The associated Dataset object + context: The operation's ApiContext + """ + pass + def generate_actions( self, - params: ApiParams, dataset: Dataset, context: ApiContext, map: Iterator[IndexStream], @@ -589,7 +628,6 @@ def generate_actions( This is an abstract method that must be implemented by a subclass. Args: - params: Type-normalized client request body JSON dataset: The associated Dataset object context: The operation's ApiContext map: Elasticsearch index document map generator @@ -760,19 +798,34 @@ def _bulk_dispatch( ApiMethod.POST, ParamType.DATASET, params ).value - operation = ( - Database.db_session.query(Operation) - .filter( - Operation.dataset_ref == dataset.id, - Operation.state == OperationState.WORKING, - ) - .first() - ) - if context["attributes"].require_stable and operation: + operations = Operation.by_state(dataset, OperationState.WORKING) + if context["attributes"].require_stable and operations: raise APIAbort( HTTPStatus.CONFLICT, - f"Dataset is working on {operation.name.name}", + f"Dataset is working on {','.join(o.name.name for o in operations)}", + ) + + component = context["attributes"].operation_name + try: + sync = Sync(logger=current_app.logger, component=component) + sync.update(dataset=dataset, state=OperationState.WORKING) + context["sync"] = sync + except Exception as e: + current_app.logger.warning( + "{} {} unable to set {} operational state: '{}'", + component, + dataset, + OperationState.WORKING.name, + e, ) + raise APIAbort(HTTPStatus.CONFLICT, "Unable to set operational state") + + try: + self.prepare(params, dataset, context) + except APIAbort: + raise + except Exception as e: + raise APIInternalError(f"Prepare {dataset.name} error: '{e}'") # If we don't have an Elasticsearch index map, then the dataset isn't # indexed and we skip the Elasticsearch actions. @@ -786,22 +839,17 @@ def _bulk_dispatch( try: results = helpers.streaming_bulk( elastic, - self.generate_actions(params, dataset, context, map), + self.generate_actions(dataset, context, map), raise_on_exception=False, raise_on_error=False, ) report = self._analyze_bulk(results, context) - except UnauthorizedAdminAccess as e: - raise APIAbort(e.http_status, str(e)) - except MissingParameters as e: - raise APIAbort(e.http_status, str(e)) except Exception as e: - raise APIInternalError("Unexpected backend error") from e - elif context["attributes"].require_map and not Metadata.getvalue( - dataset, Metadata.SERVER_ARCHIVE - ): + raise APIInternalError("Unexpected backend error '{e}'") from e + elif context["attributes"].require_map and self.expect_index(dataset): # If the dataset has no index map, the bulk operation requires one, - # and the dataset isn't marked "archive only", fail. + # and we expect one to appear, fail rather than risking abandoning + # Elasticsearch documents. raise APIAbort( HTTPStatus.CONFLICT, f"Operation unavailable: dataset {dataset.resource_id} is not indexed.", @@ -820,11 +868,13 @@ def _bulk_dispatch( # Let the subclass complete the operation try: self.complete(dataset, context, summary) + if "sync" in context: + context["sync"].update(dataset=dataset, state=OperationState.OK) except Exception as e: attributes["message"] = str(e) auditing["status"] = AuditStatus.WARNING auditing["reason"] = AuditReason.INTERNAL - raise APIInternalError("Unexpected completion error") from e + raise APIInternalError(f"Unexpected completion error '{e}'") from e # Return the summary document as the success response, or abort with an # internal error if we tried to operate on Elasticsearch documents but diff --git a/lib/pbench/server/api/resources/query_apis/dataset.py b/lib/pbench/server/api/resources/query_apis/dataset.py index ffdc7d64ef..2e57c293f3 100644 --- a/lib/pbench/server/api/resources/query_apis/dataset.py +++ b/lib/pbench/server/api/resources/query_apis/dataset.py @@ -19,13 +19,8 @@ import pbench.server.auth.auth as Auth from pbench.server.cache_manager import CacheManager from pbench.server.database.models.audit import AuditType -from pbench.server.database.models.datasets import ( - Dataset, - OperationName, - OperationState, -) +from pbench.server.database.models.datasets import Dataset, OperationName from pbench.server.database.models.index_map import IndexStream -from pbench.server.sync import Sync class Datasets(ElasticBulkBase): @@ -79,9 +74,33 @@ def __init__(self, config: PbenchServerConfig): ), ) + def prepare(self, params: ApiParams, dataset: Dataset, context: ApiContext): + """Prepare for the bulk operation + + Process and validate the API query parameters. + + Args: + params: Type-normalized client request body JSON + dataset: The associated Dataset object + context: The operation's ApiContext + """ + + if context["attributes"].action != "update": + return + access = params.query.get("access") + owner = params.query.get("owner") + if not access and not owner: + raise MissingParameters(["access", "owner"]) + if access: + context["access"] = access + if owner: + authorized_user = Auth.token_auth.current_user() + if not authorized_user.is_admin(): + raise UnauthorizedAdminAccess(authorized_user, OperationCode.UPDATE) + context["owner"] = owner + def generate_actions( self, - params: ApiParams, dataset: Dataset, context: ApiContext, map: Iterator[IndexStream], @@ -91,7 +110,6 @@ def generate_actions( dataset document map. Args: - params: API parameters dataset: the Dataset object context: CONTEXT to pass to complete map: Elasticsearch index document map generator @@ -100,26 +118,10 @@ def generate_actions( A generator for Elasticsearch bulk update actions """ action = context["attributes"].action - - sync = Sync( - logger=current_app.logger, component=context["attributes"].operation_name - ) - sync.update(dataset=dataset, state=OperationState.WORKING) - context["sync"] = sync es_doc = {} - if action == "update": - access = params.query.get("access") - owner = params.query.get("owner") - if not access and not owner: - raise MissingParameters(["access", "owner"]) - if access: - context["access"] = es_doc["access"] = access - if owner: - authorized_user = Auth.token_auth.current_user() - if not authorized_user.is_admin(): - raise UnauthorizedAdminAccess(authorized_user, OperationCode.UPDATE) - context["owner"] = es_doc["owner"] = owner + for field in {"access", "owner"} & set(context): + es_doc[field] = context[field] # Generate a series of bulk operations, which will be passed to # the Elasticsearch bulk helper. @@ -164,11 +166,14 @@ def complete( attributes["owner"] = owner dataset.owner_id = owner dataset.update() - context["sync"].update(dataset=dataset, state=OperationState.OK) elif action == "delete": cache_m = CacheManager(self.config, current_app.logger) cache_m.delete(dataset.resource_id) dataset.delete() + + # Tell caller not to update operational state for the deleted + # dataset. + del context["sync"] else: context["sync"].error( dataset=dataset, message=f"Unable to {action} some indexed documents" diff --git a/lib/pbench/server/database/models/datasets.py b/lib/pbench/server/database/models/datasets.py index 7efb8ccb19..b7754a8f0c 100644 --- a/lib/pbench/server/database/models/datasets.py +++ b/lib/pbench/server/database/models/datasets.py @@ -420,11 +420,7 @@ def as_dict(self) -> Dict[str, Any]: metadata_log = Metadata.get(self, Metadata.METALOG).value except MetadataNotFound: metadata_log = None - operations = ( - Database.db_session.query(Operation) - .filter(Operation.dataset_ref == self.id) - .all() - ) + operations = Operation.by_dataset(self) return { "access": self.access, "name": self.name, @@ -531,6 +527,58 @@ class Operation(Database.Base): dataset_ref = Column(Integer, ForeignKey("datasets.id")) dataset = relationship("Dataset", back_populates="operations") + @staticmethod + def by_dataset(dataset: Dataset) -> list["Operation"]: + """Return all operational records for a dataset. + + Args: + dataset: Dataset object + + Returns: + a list (possibly empty) of operational records + """ + return ( + Database.db_session.query(Operation) + .filter(Operation.dataset_ref == dataset.id) + .all() + ) + + @staticmethod + def by_operation( + dataset: Dataset, operation: OperationName + ) -> Optional["Operation"]: + """Return a specific operational record for a dataset. + + Args: + dataset: Dataset object + operation: the target operation + + Returns: + An operational record if it exists, else None + """ + return ( + Database.db_session.query(Operation) + .filter(Operation.dataset_ref == dataset.id, Operation.name == operation) + .first() + ) + + @staticmethod + def by_state(dataset: Dataset, state: OperationState) -> list["Operation"]: + """Return operational records in a specified state. + + Args: + dataset: Dataset object + state: the target state + + Returns: + A list (possibly empty) of operational records + """ + return ( + Database.db_session.query(Operation) + .filter(Operation.dataset_ref == dataset.id, Operation.state == state) + .all() + ) + class Metadata(Database.Base): """Retain secondary information about datasets diff --git a/lib/pbench/test/functional/server/test_datasets.py b/lib/pbench/test/functional/server/test_datasets.py index fe37d90d65..31fd96ee74 100644 --- a/lib/pbench/test/functional/server/test_datasets.py +++ b/lib/pbench/test/functional/server/test_datasets.py @@ -772,25 +772,20 @@ class TestUpdate: @pytest.mark.parametrize("access", ("public", "private")) def test_publish(self, server_client: PbenchServerClient, login_user, access): expected = "public" if access == "private" else "private" - datasets = server_client.get_list( - access=access, mine="true", metadata=["server.archiveonly"] - ) + datasets = server_client.get_list(access=access, mine="true") print(f" ... updating {access} datasets to {expected} ...") for dataset in datasets: response = server_client.update( dataset.resource_id, access=expected, raise_error=False ) - print(f"\t ... updating {dataset.name} to {access!r}") - if response.ok: - assert not dataset.metadata["server.archiveonly"] - meta = server_client.get_metadata( - dataset.resource_id, metadata="dataset.access" - ) - assert meta["dataset.access"] == expected - else: - assert dataset.metadata[ - "server.archiveonly" - ], f"Indexed dataset {dataset.name} failed to update with {response.json()['message']}" + print(f"\t ... updating {dataset.name} to {expected!r}") + assert ( + response.ok + ), f"Indexed dataset {dataset.name} failed to update with {response.json()['message']}" + meta = server_client.get_metadata( + dataset.resource_id, metadata="dataset.access" + ) + assert meta["dataset.access"] == expected class TestDelete: diff --git a/lib/pbench/test/unit/server/query_apis/test_datasets_delete.py b/lib/pbench/test/unit/server/query_apis/test_datasets_delete.py index 9420a06ffb..855bd45d22 100644 --- a/lib/pbench/test/unit/server/query_apis/test_datasets_delete.py +++ b/lib/pbench/test/unit/server/query_apis/test_datasets_delete.py @@ -7,7 +7,15 @@ from pbench.server import JSON, PbenchServerConfig from pbench.server.cache_manager import CacheManager -from pbench.server.database.models.datasets import Dataset, DatasetNotFound +from pbench.server.database.models.datasets import ( + Dataset, + DatasetNotFound, + Metadata, + Operation, + OperationName, + OperationState, +) +from pbench.server.database.models.index_map import IndexMap from pbench.test.unit.server.headertypes import HeaderTypes @@ -95,6 +103,40 @@ def fake_bulk( monkeypatch.setattr("elasticsearch.helpers.streaming_bulk", fake_bulk) + def unelastic(self, monkeypatch): + """Cause a failure if the bulk action helper is called. + + Args: + monkeypatch: The monkeypatch fixture from the test case + + Raises: + an unexpected Exception if called + """ + + def fake_bulk( + elastic: elasticsearch.Elasticsearch, + stream: Iterator[dict], + raise_on_error: bool = True, + raise_on_exception: bool = True, + ): + """ + Helper function to mock the Elasticsearch helper streaming_bulk API + to throw an exception. + + Args: + elastic: An Elasticsearch object + stream: The input stream of bulk action dicts + raise_on_error: indicates whether errors should be raised + raise_on_exception: indicates whether exceptions should propagate + or be trapped + + Raises: + Exception + """ + raise Exception("We aren't allowed to be here") + + monkeypatch.setattr("elasticsearch.helpers.streaming_bulk", fake_bulk) + def fake_cache_manager(self, monkeypatch): def fake_constructor(self, options: PbenchServerConfig, logger: Logger): pass @@ -155,7 +197,6 @@ def test_query( def test_partial( self, client, - capinternal, get_document_map, monkeypatch, server_config, @@ -178,12 +219,11 @@ def test_partial( # Verify that the Dataset still exists Dataset.query(name="drb") - def test_no_dataset( - self, client, get_document_map, monkeypatch, pbench_drb_token, server_config - ): + def test_no_dataset(self, client, monkeypatch, pbench_drb_token, server_config): """ Check the delete API if the dataset doesn't exist. """ + self.unelastic(monkeypatch) response = client.delete( f"{server_config.rest_uri}/datasets/badwolf", headers={"authorization": f"Bearer {pbench_drb_token}"}, @@ -197,9 +237,10 @@ def test_no_index( self, client, monkeypatch, attach_dataset, pbench_drb_token, server_config ): """ - Check the delete API if the dataset has no INDEX_MAP. It should + Check the delete API if the dataset has no index map. It should succeed without tripping over Elasticsearch. """ + self.unelastic(monkeypatch) self.fake_cache_manager(monkeypatch) ds = Dataset.query(name="drb") response = client.delete( @@ -213,13 +254,61 @@ def test_no_index( with pytest.raises(DatasetNotFound): Dataset.query(name="drb") + def test_archive_only( + self, attach_dataset, client, monkeypatch, pbench_drb_token, server_config + ): + """ + Check the delete API if the dataset has no index map but is + marked "server.archiveonly". It should succeed without attempting any + Elasticsearch operations. + """ + + self.unelastic(monkeypatch) + monkeypatch.setattr(Metadata, "getvalue", lambda d, k: True) + + ds = Dataset.query(name="drb") + response = client.delete( + f"{server_config.rest_uri}/datasets/{ds.resource_id}", + headers={"authorization": f"Bearer {pbench_drb_token}"}, + ) + + # Verify the report and status + assert response.status_code == HTTPStatus.OK + assert response.json == {"failure": 0, "ok": 0} + + def test_index_error( + self, attach_dataset, client, monkeypatch, pbench_drb_token, server_config + ): + """ + Check the delete API if the dataset has no index map and is + showing an indexing operational error which means it won't be indexed. + """ + + self.unelastic(monkeypatch) + monkeypatch.setattr( + Operation, + "by_operation", + lambda d, k: Operation( + name=OperationName.INDEX, state=OperationState.FAILED + ), + ) + + ds = Dataset.query(name="drb") + response = client.delete( + f"{server_config.rest_uri}/datasets/{ds.resource_id}", + headers={"authorization": f"Bearer {pbench_drb_token}"}, + ) + + # Verify the report and status + assert response.status_code == HTTPStatus.OK + assert response.json == {"failure": 0, "ok": 0} + def test_exception( self, attach_dataset, capinternal, client, monkeypatch, - get_document_map, pbench_drb_token, server_config, ): @@ -239,11 +328,13 @@ def fake_bulk( raise elasticsearch.helpers.BulkIndexError("test") monkeypatch.setattr("elasticsearch.helpers.streaming_bulk", fake_bulk) + monkeypatch.setattr(IndexMap, "exists", lambda d: True) response = client.delete( f"{server_config.rest_uri}/datasets/random_md5_string1", headers={"authorization": f"Bearer {pbench_drb_token}"}, ) + assert response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR # Verify the failure capinternal("Unexpected backend error", response) diff --git a/lib/pbench/test/unit/server/query_apis/test_datasets_update.py b/lib/pbench/test/unit/server/query_apis/test_datasets_update.py index 69062fc57e..251c60a380 100644 --- a/lib/pbench/test/unit/server/query_apis/test_datasets_update.py +++ b/lib/pbench/test/unit/server/query_apis/test_datasets_update.py @@ -5,7 +5,14 @@ import pytest from pbench.server import JSON -from pbench.server.database.models.datasets import Dataset +from pbench.server.database.models.datasets import ( + Dataset, + Metadata, + Operation, + OperationName, + OperationState, +) +from pbench.server.database.models.index_map import IndexMap from pbench.test.unit.server.headertypes import HeaderTypes @@ -93,6 +100,40 @@ def fake_bulk( monkeypatch.setattr("elasticsearch.helpers.streaming_bulk", fake_bulk) + def unelastic(self, monkeypatch): + """Cause a failure if the bulk action helper is called. + + Args: + monkeypatch: The monkeypatch fixture from the test case + + Raises: + an unexpected Exception if called + """ + + def fake_bulk( + elastic: elasticsearch.Elasticsearch, + stream: Iterator[dict], + raise_on_error: bool = True, + raise_on_exception: bool = True, + ): + """ + Helper function to mock the Elasticsearch helper streaming_bulk API, + which will validate the input actions and generate expected responses. + + Args: + elastic: An Elasticsearch object + stream: The input stream of bulk action dicts + raise_on_error: indicates whether errors should be raised + raise_on_exception: indicates whether exceptions should propagate + or be trapped + + Raises: + Exception + """ + raise Exception("We aren't allowed to be here") + + monkeypatch.setattr("elasticsearch.helpers.streaming_bulk", fake_bulk) + def test_partial( self, attach_dataset, @@ -120,13 +161,12 @@ def test_partial( dataset = Dataset.query(name="drb") assert dataset.access == Dataset.PRIVATE_ACCESS - def test_no_dataset( - self, client, get_document_map, monkeypatch, pbench_drb_token, server_config - ): + def test_no_dataset(self, client, monkeypatch, pbench_drb_token, server_config): """ Check the datasets_update API if the dataset doesn't exist. """ + self.unelastic(monkeypatch) response = client.post( f"{server_config.rest_uri}/datasets/badwolf", headers={"authorization": f"Bearer {pbench_drb_token}"}, @@ -141,11 +181,11 @@ def test_no_index( self, attach_dataset, client, monkeypatch, pbench_drb_token, server_config ): """ - Check the datasets_update API if the dataset has no INDEX_MAP. It should + Check the update API if the dataset has no index map. It should fail with a CONFLICT error. """ - self.fake_elastic(monkeypatch, {}, True) + self.unelastic(monkeypatch) ds = Dataset.query(name="drb") response = client.post( f"{server_config.rest_uri}/datasets/{ds.resource_id}", @@ -159,13 +199,62 @@ def test_no_index( "message": "Operation unavailable: dataset random_md5_string1 is not indexed." } + def test_archive_only( + self, attach_dataset, client, monkeypatch, pbench_drb_token, server_config + ): + """ + Check the update API if the dataset has no index map but is + marked "server.archiveonly". It should succeed without attempting any + Elasticsearch operations. + """ + + self.unelastic(monkeypatch) + monkeypatch.setattr(Metadata, "getvalue", lambda d, k: True) + + ds = Dataset.query(name="drb") + response = client.post( + f"{server_config.rest_uri}/datasets/{ds.resource_id}", + headers={"authorization": f"Bearer {pbench_drb_token}"}, + query_string=self.PAYLOAD, + ) + + # Verify the report and status + assert response.status_code == HTTPStatus.OK + assert response.json == {"failure": 0, "ok": 0} + + def test_index_error( + self, attach_dataset, client, monkeypatch, pbench_drb_token, server_config + ): + """ + Check the update API if the dataset has no index map and is + showing an indexing operational error which means it won't be indexed. + """ + monkeypatch.setattr( + Operation, + "by_operation", + lambda d, k: Operation( + name=OperationName.INDEX, state=OperationState.FAILED + ), + ) + self.unelastic(monkeypatch) + + ds = Dataset.query(name="drb") + response = client.post( + f"{server_config.rest_uri}/datasets/{ds.resource_id}", + headers={"authorization": f"Bearer {pbench_drb_token}"}, + query_string=self.PAYLOAD, + ) + + # Verify the report and status + assert response.status_code == HTTPStatus.OK + assert response.json == {"failure": 0, "ok": 0} + def test_exception( self, attach_dataset, capinternal, client, monkeypatch, - get_document_map, pbench_drb_token, server_config, ): @@ -185,12 +274,14 @@ def fake_bulk( raise elasticsearch.helpers.BulkIndexError("test") monkeypatch.setattr("elasticsearch.helpers.streaming_bulk", fake_bulk) + monkeypatch.setattr(IndexMap, "exists", lambda d: True) response = client.post( f"{server_config.rest_uri}/datasets/random_md5_string1", headers={"authorization": f"Bearer {pbench_drb_token}"}, query_string=self.PAYLOAD, ) + assert response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR # Verify the failure capinternal("Unexpected backend error", response) @@ -267,7 +358,7 @@ def test_invalid_owner_params( response = client.post( f"{server_config.rest_uri}/datasets/{ds.resource_id}", headers={"authorization": f"Bearer {pbench_admin_token}"}, - query_string={"owner": str("invalid_owner")}, + query_string={"owner": "invalid_owner"}, ) # Verify the report and status