From ede82306c886282d8855bce9a4c246752b2679e6 Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Fri, 25 Aug 2023 18:32:33 -0400 Subject: [PATCH] Fix publish/delete of unindexed datasets (#3524) * Fix publish/delete of unindexed datasets PBENCH-1247 The bulk Elasticsearch API operations (update and delete) have never had clean handling of un-indexed datasets. The functional tests, for example, expected "publish" of the archive-only datasets to fail. This was partly because I did too much "common stuff" inside the bulk action generator. Additionally, a dataset where indexing fails becomes somewhat "stranded" as we expect it to be indexed, but it isn't. This PR refactors the checks in order to move some common operations out of the action generators with additional checks for eligibility and cleanup. We don't prevent publish, for example, due to lack of an index map if the dataset is archive-only, or if indexing failed, and we'll still do the necessary finalization. --- .../api/resources/query_apis/__init__.py | 102 ++++++++++++----- .../api/resources/query_apis/dataset.py | 59 +++++----- lib/pbench/server/database/models/datasets.py | 58 +++++++++- .../test/functional/server/test_datasets.py | 23 ++-- .../server/query_apis/test_datasets_delete.py | 105 +++++++++++++++-- .../server/query_apis/test_datasets_update.py | 107 ++++++++++++++++-- 6 files changed, 367 insertions(+), 87 deletions(-) diff --git a/lib/pbench/server/api/resources/query_apis/__init__.py b/lib/pbench/server/api/resources/query_apis/__init__.py index 0ec31c2b1b..9cac23a5e9 100644 --- a/lib/pbench/server/api/resources/query_apis/__init__.py +++ b/lib/pbench/server/api/resources/query_apis/__init__.py @@ -25,24 +25,23 @@ ApiMethod, ApiParams, ApiSchema, - MissingParameters, ParamType, SchemaError, UnauthorizedAccess, - UnauthorizedAdminAccess, ) import pbench.server.auth.auth as Auth -from pbench.server.database.database import Database from pbench.server.database.models.audit import AuditReason, AuditStatus from pbench.server.database.models.datasets import ( Dataset, Metadata, Operation, + OperationName, OperationState, ) from pbench.server.database.models.index_map import IndexMap, IndexStream from pbench.server.database.models.templates import Template from pbench.server.database.models.users import User +from pbench.server.sync import Sync class MissingBulkSchemaParameters(SchemaError): @@ -568,9 +567,49 @@ def __init__( self.schemas[ApiMethod.POST].authorization == ApiAuthorizationType.DATASET ), f"API {self.__class__.__name__} authorization type must be DATASET" + def expect_index(self, dataset: Dataset) -> bool: + """Are we waiting for an index map? + + If a dataset doesn't have an index map, and we require one, we need to + know whether we should expect one in the future. If not, we can usually + ignore the requirement (which is to be sure we don't strand the + Elasticsearch documents). + + We don't expect an index map if: + + 1) If the dataset is marked with "server.archiveonly", we won't attempt + to create an index; + 2) If we attempted to index the dataset, but failed, we'd like to be + able to publish (or delete) the dataset anyway. + + Args: + dataset: a Dataset object + + Returns: + True if we should expect an index to appear, or False if not + """ + archive_only = Metadata.getvalue(dataset, Metadata.SERVER_ARCHIVE) + if archive_only: + return False + index_state = Operation.by_operation(dataset, OperationName.INDEX) + if index_state and index_state.state is OperationState.FAILED: + return False + return True + + def prepare(self, params: ApiParams, dataset: Dataset, context: ApiContext): + """Prepare for the bulk operation + + This is an empty abstract method that can be overridden by a subclass. + + Args: + params: Type-normalized client request body JSON + dataset: The associated Dataset object + context: The operation's ApiContext + """ + pass + def generate_actions( self, - params: ApiParams, dataset: Dataset, context: ApiContext, map: Iterator[IndexStream], @@ -589,7 +628,6 @@ def generate_actions( This is an abstract method that must be implemented by a subclass. Args: - params: Type-normalized client request body JSON dataset: The associated Dataset object context: The operation's ApiContext map: Elasticsearch index document map generator @@ -760,19 +798,34 @@ def _bulk_dispatch( ApiMethod.POST, ParamType.DATASET, params ).value - operation = ( - Database.db_session.query(Operation) - .filter( - Operation.dataset_ref == dataset.id, - Operation.state == OperationState.WORKING, - ) - .first() - ) - if context["attributes"].require_stable and operation: + operations = Operation.by_state(dataset, OperationState.WORKING) + if context["attributes"].require_stable and operations: raise APIAbort( HTTPStatus.CONFLICT, - f"Dataset is working on {operation.name.name}", + f"Dataset is working on {','.join(o.name.name for o in operations)}", + ) + + component = context["attributes"].operation_name + try: + sync = Sync(logger=current_app.logger, component=component) + sync.update(dataset=dataset, state=OperationState.WORKING) + context["sync"] = sync + except Exception as e: + current_app.logger.warning( + "{} {} unable to set {} operational state: '{}'", + component, + dataset, + OperationState.WORKING.name, + e, ) + raise APIAbort(HTTPStatus.CONFLICT, "Unable to set operational state") + + try: + self.prepare(params, dataset, context) + except APIAbort: + raise + except Exception as e: + raise APIInternalError(f"Prepare {dataset.name} error: '{e}'") # If we don't have an Elasticsearch index map, then the dataset isn't # indexed and we skip the Elasticsearch actions. @@ -786,22 +839,17 @@ def _bulk_dispatch( try: results = helpers.streaming_bulk( elastic, - self.generate_actions(params, dataset, context, map), + self.generate_actions(dataset, context, map), raise_on_exception=False, raise_on_error=False, ) report = self._analyze_bulk(results, context) - except UnauthorizedAdminAccess as e: - raise APIAbort(e.http_status, str(e)) - except MissingParameters as e: - raise APIAbort(e.http_status, str(e)) except Exception as e: - raise APIInternalError("Unexpected backend error") from e - elif context["attributes"].require_map and not Metadata.getvalue( - dataset, Metadata.SERVER_ARCHIVE - ): + raise APIInternalError("Unexpected backend error '{e}'") from e + elif context["attributes"].require_map and self.expect_index(dataset): # If the dataset has no index map, the bulk operation requires one, - # and the dataset isn't marked "archive only", fail. + # and we expect one to appear, fail rather than risking abandoning + # Elasticsearch documents. raise APIAbort( HTTPStatus.CONFLICT, f"Operation unavailable: dataset {dataset.resource_id} is not indexed.", @@ -820,11 +868,13 @@ def _bulk_dispatch( # Let the subclass complete the operation try: self.complete(dataset, context, summary) + if "sync" in context: + context["sync"].update(dataset=dataset, state=OperationState.OK) except Exception as e: attributes["message"] = str(e) auditing["status"] = AuditStatus.WARNING auditing["reason"] = AuditReason.INTERNAL - raise APIInternalError("Unexpected completion error") from e + raise APIInternalError(f"Unexpected completion error '{e}'") from e # Return the summary document as the success response, or abort with an # internal error if we tried to operate on Elasticsearch documents but diff --git a/lib/pbench/server/api/resources/query_apis/dataset.py b/lib/pbench/server/api/resources/query_apis/dataset.py index ffdc7d64ef..2e57c293f3 100644 --- a/lib/pbench/server/api/resources/query_apis/dataset.py +++ b/lib/pbench/server/api/resources/query_apis/dataset.py @@ -19,13 +19,8 @@ import pbench.server.auth.auth as Auth from pbench.server.cache_manager import CacheManager from pbench.server.database.models.audit import AuditType -from pbench.server.database.models.datasets import ( - Dataset, - OperationName, - OperationState, -) +from pbench.server.database.models.datasets import Dataset, OperationName from pbench.server.database.models.index_map import IndexStream -from pbench.server.sync import Sync class Datasets(ElasticBulkBase): @@ -79,9 +74,33 @@ def __init__(self, config: PbenchServerConfig): ), ) + def prepare(self, params: ApiParams, dataset: Dataset, context: ApiContext): + """Prepare for the bulk operation + + Process and validate the API query parameters. + + Args: + params: Type-normalized client request body JSON + dataset: The associated Dataset object + context: The operation's ApiContext + """ + + if context["attributes"].action != "update": + return + access = params.query.get("access") + owner = params.query.get("owner") + if not access and not owner: + raise MissingParameters(["access", "owner"]) + if access: + context["access"] = access + if owner: + authorized_user = Auth.token_auth.current_user() + if not authorized_user.is_admin(): + raise UnauthorizedAdminAccess(authorized_user, OperationCode.UPDATE) + context["owner"] = owner + def generate_actions( self, - params: ApiParams, dataset: Dataset, context: ApiContext, map: Iterator[IndexStream], @@ -91,7 +110,6 @@ def generate_actions( dataset document map. Args: - params: API parameters dataset: the Dataset object context: CONTEXT to pass to complete map: Elasticsearch index document map generator @@ -100,26 +118,10 @@ def generate_actions( A generator for Elasticsearch bulk update actions """ action = context["attributes"].action - - sync = Sync( - logger=current_app.logger, component=context["attributes"].operation_name - ) - sync.update(dataset=dataset, state=OperationState.WORKING) - context["sync"] = sync es_doc = {} - if action == "update": - access = params.query.get("access") - owner = params.query.get("owner") - if not access and not owner: - raise MissingParameters(["access", "owner"]) - if access: - context["access"] = es_doc["access"] = access - if owner: - authorized_user = Auth.token_auth.current_user() - if not authorized_user.is_admin(): - raise UnauthorizedAdminAccess(authorized_user, OperationCode.UPDATE) - context["owner"] = es_doc["owner"] = owner + for field in {"access", "owner"} & set(context): + es_doc[field] = context[field] # Generate a series of bulk operations, which will be passed to # the Elasticsearch bulk helper. @@ -164,11 +166,14 @@ def complete( attributes["owner"] = owner dataset.owner_id = owner dataset.update() - context["sync"].update(dataset=dataset, state=OperationState.OK) elif action == "delete": cache_m = CacheManager(self.config, current_app.logger) cache_m.delete(dataset.resource_id) dataset.delete() + + # Tell caller not to update operational state for the deleted + # dataset. + del context["sync"] else: context["sync"].error( dataset=dataset, message=f"Unable to {action} some indexed documents" diff --git a/lib/pbench/server/database/models/datasets.py b/lib/pbench/server/database/models/datasets.py index 7efb8ccb19..b7754a8f0c 100644 --- a/lib/pbench/server/database/models/datasets.py +++ b/lib/pbench/server/database/models/datasets.py @@ -420,11 +420,7 @@ def as_dict(self) -> Dict[str, Any]: metadata_log = Metadata.get(self, Metadata.METALOG).value except MetadataNotFound: metadata_log = None - operations = ( - Database.db_session.query(Operation) - .filter(Operation.dataset_ref == self.id) - .all() - ) + operations = Operation.by_dataset(self) return { "access": self.access, "name": self.name, @@ -531,6 +527,58 @@ class Operation(Database.Base): dataset_ref = Column(Integer, ForeignKey("datasets.id")) dataset = relationship("Dataset", back_populates="operations") + @staticmethod + def by_dataset(dataset: Dataset) -> list["Operation"]: + """Return all operational records for a dataset. + + Args: + dataset: Dataset object + + Returns: + a list (possibly empty) of operational records + """ + return ( + Database.db_session.query(Operation) + .filter(Operation.dataset_ref == dataset.id) + .all() + ) + + @staticmethod + def by_operation( + dataset: Dataset, operation: OperationName + ) -> Optional["Operation"]: + """Return a specific operational record for a dataset. + + Args: + dataset: Dataset object + operation: the target operation + + Returns: + An operational record if it exists, else None + """ + return ( + Database.db_session.query(Operation) + .filter(Operation.dataset_ref == dataset.id, Operation.name == operation) + .first() + ) + + @staticmethod + def by_state(dataset: Dataset, state: OperationState) -> list["Operation"]: + """Return operational records in a specified state. + + Args: + dataset: Dataset object + state: the target state + + Returns: + A list (possibly empty) of operational records + """ + return ( + Database.db_session.query(Operation) + .filter(Operation.dataset_ref == dataset.id, Operation.state == state) + .all() + ) + class Metadata(Database.Base): """Retain secondary information about datasets diff --git a/lib/pbench/test/functional/server/test_datasets.py b/lib/pbench/test/functional/server/test_datasets.py index fe37d90d65..31fd96ee74 100644 --- a/lib/pbench/test/functional/server/test_datasets.py +++ b/lib/pbench/test/functional/server/test_datasets.py @@ -772,25 +772,20 @@ class TestUpdate: @pytest.mark.parametrize("access", ("public", "private")) def test_publish(self, server_client: PbenchServerClient, login_user, access): expected = "public" if access == "private" else "private" - datasets = server_client.get_list( - access=access, mine="true", metadata=["server.archiveonly"] - ) + datasets = server_client.get_list(access=access, mine="true") print(f" ... updating {access} datasets to {expected} ...") for dataset in datasets: response = server_client.update( dataset.resource_id, access=expected, raise_error=False ) - print(f"\t ... updating {dataset.name} to {access!r}") - if response.ok: - assert not dataset.metadata["server.archiveonly"] - meta = server_client.get_metadata( - dataset.resource_id, metadata="dataset.access" - ) - assert meta["dataset.access"] == expected - else: - assert dataset.metadata[ - "server.archiveonly" - ], f"Indexed dataset {dataset.name} failed to update with {response.json()['message']}" + print(f"\t ... updating {dataset.name} to {expected!r}") + assert ( + response.ok + ), f"Indexed dataset {dataset.name} failed to update with {response.json()['message']}" + meta = server_client.get_metadata( + dataset.resource_id, metadata="dataset.access" + ) + assert meta["dataset.access"] == expected class TestDelete: diff --git a/lib/pbench/test/unit/server/query_apis/test_datasets_delete.py b/lib/pbench/test/unit/server/query_apis/test_datasets_delete.py index 9420a06ffb..855bd45d22 100644 --- a/lib/pbench/test/unit/server/query_apis/test_datasets_delete.py +++ b/lib/pbench/test/unit/server/query_apis/test_datasets_delete.py @@ -7,7 +7,15 @@ from pbench.server import JSON, PbenchServerConfig from pbench.server.cache_manager import CacheManager -from pbench.server.database.models.datasets import Dataset, DatasetNotFound +from pbench.server.database.models.datasets import ( + Dataset, + DatasetNotFound, + Metadata, + Operation, + OperationName, + OperationState, +) +from pbench.server.database.models.index_map import IndexMap from pbench.test.unit.server.headertypes import HeaderTypes @@ -95,6 +103,40 @@ def fake_bulk( monkeypatch.setattr("elasticsearch.helpers.streaming_bulk", fake_bulk) + def unelastic(self, monkeypatch): + """Cause a failure if the bulk action helper is called. + + Args: + monkeypatch: The monkeypatch fixture from the test case + + Raises: + an unexpected Exception if called + """ + + def fake_bulk( + elastic: elasticsearch.Elasticsearch, + stream: Iterator[dict], + raise_on_error: bool = True, + raise_on_exception: bool = True, + ): + """ + Helper function to mock the Elasticsearch helper streaming_bulk API + to throw an exception. + + Args: + elastic: An Elasticsearch object + stream: The input stream of bulk action dicts + raise_on_error: indicates whether errors should be raised + raise_on_exception: indicates whether exceptions should propagate + or be trapped + + Raises: + Exception + """ + raise Exception("We aren't allowed to be here") + + monkeypatch.setattr("elasticsearch.helpers.streaming_bulk", fake_bulk) + def fake_cache_manager(self, monkeypatch): def fake_constructor(self, options: PbenchServerConfig, logger: Logger): pass @@ -155,7 +197,6 @@ def test_query( def test_partial( self, client, - capinternal, get_document_map, monkeypatch, server_config, @@ -178,12 +219,11 @@ def test_partial( # Verify that the Dataset still exists Dataset.query(name="drb") - def test_no_dataset( - self, client, get_document_map, monkeypatch, pbench_drb_token, server_config - ): + def test_no_dataset(self, client, monkeypatch, pbench_drb_token, server_config): """ Check the delete API if the dataset doesn't exist. """ + self.unelastic(monkeypatch) response = client.delete( f"{server_config.rest_uri}/datasets/badwolf", headers={"authorization": f"Bearer {pbench_drb_token}"}, @@ -197,9 +237,10 @@ def test_no_index( self, client, monkeypatch, attach_dataset, pbench_drb_token, server_config ): """ - Check the delete API if the dataset has no INDEX_MAP. It should + Check the delete API if the dataset has no index map. It should succeed without tripping over Elasticsearch. """ + self.unelastic(monkeypatch) self.fake_cache_manager(monkeypatch) ds = Dataset.query(name="drb") response = client.delete( @@ -213,13 +254,61 @@ def test_no_index( with pytest.raises(DatasetNotFound): Dataset.query(name="drb") + def test_archive_only( + self, attach_dataset, client, monkeypatch, pbench_drb_token, server_config + ): + """ + Check the delete API if the dataset has no index map but is + marked "server.archiveonly". It should succeed without attempting any + Elasticsearch operations. + """ + + self.unelastic(monkeypatch) + monkeypatch.setattr(Metadata, "getvalue", lambda d, k: True) + + ds = Dataset.query(name="drb") + response = client.delete( + f"{server_config.rest_uri}/datasets/{ds.resource_id}", + headers={"authorization": f"Bearer {pbench_drb_token}"}, + ) + + # Verify the report and status + assert response.status_code == HTTPStatus.OK + assert response.json == {"failure": 0, "ok": 0} + + def test_index_error( + self, attach_dataset, client, monkeypatch, pbench_drb_token, server_config + ): + """ + Check the delete API if the dataset has no index map and is + showing an indexing operational error which means it won't be indexed. + """ + + self.unelastic(monkeypatch) + monkeypatch.setattr( + Operation, + "by_operation", + lambda d, k: Operation( + name=OperationName.INDEX, state=OperationState.FAILED + ), + ) + + ds = Dataset.query(name="drb") + response = client.delete( + f"{server_config.rest_uri}/datasets/{ds.resource_id}", + headers={"authorization": f"Bearer {pbench_drb_token}"}, + ) + + # Verify the report and status + assert response.status_code == HTTPStatus.OK + assert response.json == {"failure": 0, "ok": 0} + def test_exception( self, attach_dataset, capinternal, client, monkeypatch, - get_document_map, pbench_drb_token, server_config, ): @@ -239,11 +328,13 @@ def fake_bulk( raise elasticsearch.helpers.BulkIndexError("test") monkeypatch.setattr("elasticsearch.helpers.streaming_bulk", fake_bulk) + monkeypatch.setattr(IndexMap, "exists", lambda d: True) response = client.delete( f"{server_config.rest_uri}/datasets/random_md5_string1", headers={"authorization": f"Bearer {pbench_drb_token}"}, ) + assert response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR # Verify the failure capinternal("Unexpected backend error", response) diff --git a/lib/pbench/test/unit/server/query_apis/test_datasets_update.py b/lib/pbench/test/unit/server/query_apis/test_datasets_update.py index 69062fc57e..251c60a380 100644 --- a/lib/pbench/test/unit/server/query_apis/test_datasets_update.py +++ b/lib/pbench/test/unit/server/query_apis/test_datasets_update.py @@ -5,7 +5,14 @@ import pytest from pbench.server import JSON -from pbench.server.database.models.datasets import Dataset +from pbench.server.database.models.datasets import ( + Dataset, + Metadata, + Operation, + OperationName, + OperationState, +) +from pbench.server.database.models.index_map import IndexMap from pbench.test.unit.server.headertypes import HeaderTypes @@ -93,6 +100,40 @@ def fake_bulk( monkeypatch.setattr("elasticsearch.helpers.streaming_bulk", fake_bulk) + def unelastic(self, monkeypatch): + """Cause a failure if the bulk action helper is called. + + Args: + monkeypatch: The monkeypatch fixture from the test case + + Raises: + an unexpected Exception if called + """ + + def fake_bulk( + elastic: elasticsearch.Elasticsearch, + stream: Iterator[dict], + raise_on_error: bool = True, + raise_on_exception: bool = True, + ): + """ + Helper function to mock the Elasticsearch helper streaming_bulk API, + which will validate the input actions and generate expected responses. + + Args: + elastic: An Elasticsearch object + stream: The input stream of bulk action dicts + raise_on_error: indicates whether errors should be raised + raise_on_exception: indicates whether exceptions should propagate + or be trapped + + Raises: + Exception + """ + raise Exception("We aren't allowed to be here") + + monkeypatch.setattr("elasticsearch.helpers.streaming_bulk", fake_bulk) + def test_partial( self, attach_dataset, @@ -120,13 +161,12 @@ def test_partial( dataset = Dataset.query(name="drb") assert dataset.access == Dataset.PRIVATE_ACCESS - def test_no_dataset( - self, client, get_document_map, monkeypatch, pbench_drb_token, server_config - ): + def test_no_dataset(self, client, monkeypatch, pbench_drb_token, server_config): """ Check the datasets_update API if the dataset doesn't exist. """ + self.unelastic(monkeypatch) response = client.post( f"{server_config.rest_uri}/datasets/badwolf", headers={"authorization": f"Bearer {pbench_drb_token}"}, @@ -141,11 +181,11 @@ def test_no_index( self, attach_dataset, client, monkeypatch, pbench_drb_token, server_config ): """ - Check the datasets_update API if the dataset has no INDEX_MAP. It should + Check the update API if the dataset has no index map. It should fail with a CONFLICT error. """ - self.fake_elastic(monkeypatch, {}, True) + self.unelastic(monkeypatch) ds = Dataset.query(name="drb") response = client.post( f"{server_config.rest_uri}/datasets/{ds.resource_id}", @@ -159,13 +199,62 @@ def test_no_index( "message": "Operation unavailable: dataset random_md5_string1 is not indexed." } + def test_archive_only( + self, attach_dataset, client, monkeypatch, pbench_drb_token, server_config + ): + """ + Check the update API if the dataset has no index map but is + marked "server.archiveonly". It should succeed without attempting any + Elasticsearch operations. + """ + + self.unelastic(monkeypatch) + monkeypatch.setattr(Metadata, "getvalue", lambda d, k: True) + + ds = Dataset.query(name="drb") + response = client.post( + f"{server_config.rest_uri}/datasets/{ds.resource_id}", + headers={"authorization": f"Bearer {pbench_drb_token}"}, + query_string=self.PAYLOAD, + ) + + # Verify the report and status + assert response.status_code == HTTPStatus.OK + assert response.json == {"failure": 0, "ok": 0} + + def test_index_error( + self, attach_dataset, client, monkeypatch, pbench_drb_token, server_config + ): + """ + Check the update API if the dataset has no index map and is + showing an indexing operational error which means it won't be indexed. + """ + monkeypatch.setattr( + Operation, + "by_operation", + lambda d, k: Operation( + name=OperationName.INDEX, state=OperationState.FAILED + ), + ) + self.unelastic(monkeypatch) + + ds = Dataset.query(name="drb") + response = client.post( + f"{server_config.rest_uri}/datasets/{ds.resource_id}", + headers={"authorization": f"Bearer {pbench_drb_token}"}, + query_string=self.PAYLOAD, + ) + + # Verify the report and status + assert response.status_code == HTTPStatus.OK + assert response.json == {"failure": 0, "ok": 0} + def test_exception( self, attach_dataset, capinternal, client, monkeypatch, - get_document_map, pbench_drb_token, server_config, ): @@ -185,12 +274,14 @@ def fake_bulk( raise elasticsearch.helpers.BulkIndexError("test") monkeypatch.setattr("elasticsearch.helpers.streaming_bulk", fake_bulk) + monkeypatch.setattr(IndexMap, "exists", lambda d: True) response = client.post( f"{server_config.rest_uri}/datasets/random_md5_string1", headers={"authorization": f"Bearer {pbench_drb_token}"}, query_string=self.PAYLOAD, ) + assert response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR # Verify the failure capinternal("Unexpected backend error", response) @@ -267,7 +358,7 @@ def test_invalid_owner_params( response = client.post( f"{server_config.rest_uri}/datasets/{ds.resource_id}", headers={"authorization": f"Bearer {pbench_admin_token}"}, - query_string={"owner": str("invalid_owner")}, + query_string={"owner": "invalid_owner"}, ) # Verify the report and status