Skip to content

Commit

Permalink
Fix publish/delete of unindexed datasets (distributed-system-analysis…
Browse files Browse the repository at this point in the history
…#3524)

* Fix publish/delete of unindexed datasets

PBENCH-1247

The bulk Elasticsearch API operations (update and delete) have never had clean
handling of un-indexed datasets. The functional tests, for example, expected
"publish" of the archive-only datasets to fail. This was partly because I did
too much "common stuff" inside the bulk action generator. Additionally, a
dataset where indexing fails becomes somewhat "stranded" as we expect it to be
indexed, but it isn't.

This PR refactors the checks in order to move some common operations out of
the action generators with additional checks for eligibility and cleanup. We
don't prevent publish, for example, due to lack of an index map if the dataset
is archive-only, or if indexing failed, and we'll still do the necessary
finalization.
  • Loading branch information
dbutenhof authored Aug 25, 2023
1 parent b9d0ed0 commit ede8230
Show file tree
Hide file tree
Showing 6 changed files with 367 additions and 87 deletions.
102 changes: 76 additions & 26 deletions lib/pbench/server/api/resources/query_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,23 @@
ApiMethod,
ApiParams,
ApiSchema,
MissingParameters,
ParamType,
SchemaError,
UnauthorizedAccess,
UnauthorizedAdminAccess,
)
import pbench.server.auth.auth as Auth
from pbench.server.database.database import Database
from pbench.server.database.models.audit import AuditReason, AuditStatus
from pbench.server.database.models.datasets import (
Dataset,
Metadata,
Operation,
OperationName,
OperationState,
)
from pbench.server.database.models.index_map import IndexMap, IndexStream
from pbench.server.database.models.templates import Template
from pbench.server.database.models.users import User
from pbench.server.sync import Sync


class MissingBulkSchemaParameters(SchemaError):
Expand Down Expand Up @@ -568,9 +567,49 @@ def __init__(
self.schemas[ApiMethod.POST].authorization == ApiAuthorizationType.DATASET
), f"API {self.__class__.__name__} authorization type must be DATASET"

def expect_index(self, dataset: Dataset) -> bool:
"""Are we waiting for an index map?
If a dataset doesn't have an index map, and we require one, we need to
know whether we should expect one in the future. If not, we can usually
ignore the requirement (which is to be sure we don't strand the
Elasticsearch documents).
We don't expect an index map if:
1) If the dataset is marked with "server.archiveonly", we won't attempt
to create an index;
2) If we attempted to index the dataset, but failed, we'd like to be
able to publish (or delete) the dataset anyway.
Args:
dataset: a Dataset object
Returns:
True if we should expect an index to appear, or False if not
"""
archive_only = Metadata.getvalue(dataset, Metadata.SERVER_ARCHIVE)
if archive_only:
return False
index_state = Operation.by_operation(dataset, OperationName.INDEX)
if index_state and index_state.state is OperationState.FAILED:
return False
return True

def prepare(self, params: ApiParams, dataset: Dataset, context: ApiContext):
"""Prepare for the bulk operation
This is an empty abstract method that can be overridden by a subclass.
Args:
params: Type-normalized client request body JSON
dataset: The associated Dataset object
context: The operation's ApiContext
"""
pass

def generate_actions(
self,
params: ApiParams,
dataset: Dataset,
context: ApiContext,
map: Iterator[IndexStream],
Expand All @@ -589,7 +628,6 @@ def generate_actions(
This is an abstract method that must be implemented by a subclass.
Args:
params: Type-normalized client request body JSON
dataset: The associated Dataset object
context: The operation's ApiContext
map: Elasticsearch index document map generator
Expand Down Expand Up @@ -760,19 +798,34 @@ def _bulk_dispatch(
ApiMethod.POST, ParamType.DATASET, params
).value

operation = (
Database.db_session.query(Operation)
.filter(
Operation.dataset_ref == dataset.id,
Operation.state == OperationState.WORKING,
)
.first()
)
if context["attributes"].require_stable and operation:
operations = Operation.by_state(dataset, OperationState.WORKING)
if context["attributes"].require_stable and operations:
raise APIAbort(
HTTPStatus.CONFLICT,
f"Dataset is working on {operation.name.name}",
f"Dataset is working on {','.join(o.name.name for o in operations)}",
)

component = context["attributes"].operation_name
try:
sync = Sync(logger=current_app.logger, component=component)
sync.update(dataset=dataset, state=OperationState.WORKING)
context["sync"] = sync
except Exception as e:
current_app.logger.warning(
"{} {} unable to set {} operational state: '{}'",
component,
dataset,
OperationState.WORKING.name,
e,
)
raise APIAbort(HTTPStatus.CONFLICT, "Unable to set operational state")

try:
self.prepare(params, dataset, context)
except APIAbort:
raise
except Exception as e:
raise APIInternalError(f"Prepare {dataset.name} error: '{e}'")

# If we don't have an Elasticsearch index map, then the dataset isn't
# indexed and we skip the Elasticsearch actions.
Expand All @@ -786,22 +839,17 @@ def _bulk_dispatch(
try:
results = helpers.streaming_bulk(
elastic,
self.generate_actions(params, dataset, context, map),
self.generate_actions(dataset, context, map),
raise_on_exception=False,
raise_on_error=False,
)
report = self._analyze_bulk(results, context)
except UnauthorizedAdminAccess as e:
raise APIAbort(e.http_status, str(e))
except MissingParameters as e:
raise APIAbort(e.http_status, str(e))
except Exception as e:
raise APIInternalError("Unexpected backend error") from e
elif context["attributes"].require_map and not Metadata.getvalue(
dataset, Metadata.SERVER_ARCHIVE
):
raise APIInternalError("Unexpected backend error '{e}'") from e
elif context["attributes"].require_map and self.expect_index(dataset):
# If the dataset has no index map, the bulk operation requires one,
# and the dataset isn't marked "archive only", fail.
# and we expect one to appear, fail rather than risking abandoning
# Elasticsearch documents.
raise APIAbort(
HTTPStatus.CONFLICT,
f"Operation unavailable: dataset {dataset.resource_id} is not indexed.",
Expand All @@ -820,11 +868,13 @@ def _bulk_dispatch(
# Let the subclass complete the operation
try:
self.complete(dataset, context, summary)
if "sync" in context:
context["sync"].update(dataset=dataset, state=OperationState.OK)
except Exception as e:
attributes["message"] = str(e)
auditing["status"] = AuditStatus.WARNING
auditing["reason"] = AuditReason.INTERNAL
raise APIInternalError("Unexpected completion error") from e
raise APIInternalError(f"Unexpected completion error '{e}'") from e

# Return the summary document as the success response, or abort with an
# internal error if we tried to operate on Elasticsearch documents but
Expand Down
59 changes: 32 additions & 27 deletions lib/pbench/server/api/resources/query_apis/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,8 @@
import pbench.server.auth.auth as Auth
from pbench.server.cache_manager import CacheManager
from pbench.server.database.models.audit import AuditType
from pbench.server.database.models.datasets import (
Dataset,
OperationName,
OperationState,
)
from pbench.server.database.models.datasets import Dataset, OperationName
from pbench.server.database.models.index_map import IndexStream
from pbench.server.sync import Sync


class Datasets(ElasticBulkBase):
Expand Down Expand Up @@ -79,9 +74,33 @@ def __init__(self, config: PbenchServerConfig):
),
)

def prepare(self, params: ApiParams, dataset: Dataset, context: ApiContext):
"""Prepare for the bulk operation
Process and validate the API query parameters.
Args:
params: Type-normalized client request body JSON
dataset: The associated Dataset object
context: The operation's ApiContext
"""

if context["attributes"].action != "update":
return
access = params.query.get("access")
owner = params.query.get("owner")
if not access and not owner:
raise MissingParameters(["access", "owner"])
if access:
context["access"] = access
if owner:
authorized_user = Auth.token_auth.current_user()
if not authorized_user.is_admin():
raise UnauthorizedAdminAccess(authorized_user, OperationCode.UPDATE)
context["owner"] = owner

def generate_actions(
self,
params: ApiParams,
dataset: Dataset,
context: ApiContext,
map: Iterator[IndexStream],
Expand All @@ -91,7 +110,6 @@ def generate_actions(
dataset document map.
Args:
params: API parameters
dataset: the Dataset object
context: CONTEXT to pass to complete
map: Elasticsearch index document map generator
Expand All @@ -100,26 +118,10 @@ def generate_actions(
A generator for Elasticsearch bulk update actions
"""
action = context["attributes"].action

sync = Sync(
logger=current_app.logger, component=context["attributes"].operation_name
)
sync.update(dataset=dataset, state=OperationState.WORKING)
context["sync"] = sync
es_doc = {}

if action == "update":
access = params.query.get("access")
owner = params.query.get("owner")
if not access and not owner:
raise MissingParameters(["access", "owner"])
if access:
context["access"] = es_doc["access"] = access
if owner:
authorized_user = Auth.token_auth.current_user()
if not authorized_user.is_admin():
raise UnauthorizedAdminAccess(authorized_user, OperationCode.UPDATE)
context["owner"] = es_doc["owner"] = owner
for field in {"access", "owner"} & set(context):
es_doc[field] = context[field]

# Generate a series of bulk operations, which will be passed to
# the Elasticsearch bulk helper.
Expand Down Expand Up @@ -164,11 +166,14 @@ def complete(
attributes["owner"] = owner
dataset.owner_id = owner
dataset.update()
context["sync"].update(dataset=dataset, state=OperationState.OK)
elif action == "delete":
cache_m = CacheManager(self.config, current_app.logger)
cache_m.delete(dataset.resource_id)
dataset.delete()

# Tell caller not to update operational state for the deleted
# dataset.
del context["sync"]
else:
context["sync"].error(
dataset=dataset, message=f"Unable to {action} some indexed documents"
Expand Down
58 changes: 53 additions & 5 deletions lib/pbench/server/database/models/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,11 +420,7 @@ def as_dict(self) -> Dict[str, Any]:
metadata_log = Metadata.get(self, Metadata.METALOG).value
except MetadataNotFound:
metadata_log = None
operations = (
Database.db_session.query(Operation)
.filter(Operation.dataset_ref == self.id)
.all()
)
operations = Operation.by_dataset(self)
return {
"access": self.access,
"name": self.name,
Expand Down Expand Up @@ -531,6 +527,58 @@ class Operation(Database.Base):
dataset_ref = Column(Integer, ForeignKey("datasets.id"))
dataset = relationship("Dataset", back_populates="operations")

@staticmethod
def by_dataset(dataset: Dataset) -> list["Operation"]:
"""Return all operational records for a dataset.
Args:
dataset: Dataset object
Returns:
a list (possibly empty) of operational records
"""
return (
Database.db_session.query(Operation)
.filter(Operation.dataset_ref == dataset.id)
.all()
)

@staticmethod
def by_operation(
dataset: Dataset, operation: OperationName
) -> Optional["Operation"]:
"""Return a specific operational record for a dataset.
Args:
dataset: Dataset object
operation: the target operation
Returns:
An operational record if it exists, else None
"""
return (
Database.db_session.query(Operation)
.filter(Operation.dataset_ref == dataset.id, Operation.name == operation)
.first()
)

@staticmethod
def by_state(dataset: Dataset, state: OperationState) -> list["Operation"]:
"""Return operational records in a specified state.
Args:
dataset: Dataset object
state: the target state
Returns:
A list (possibly empty) of operational records
"""
return (
Database.db_session.query(Operation)
.filter(Operation.dataset_ref == dataset.id, Operation.state == state)
.all()
)


class Metadata(Database.Base):
"""Retain secondary information about datasets
Expand Down
23 changes: 9 additions & 14 deletions lib/pbench/test/functional/server/test_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -772,25 +772,20 @@ class TestUpdate:
@pytest.mark.parametrize("access", ("public", "private"))
def test_publish(self, server_client: PbenchServerClient, login_user, access):
expected = "public" if access == "private" else "private"
datasets = server_client.get_list(
access=access, mine="true", metadata=["server.archiveonly"]
)
datasets = server_client.get_list(access=access, mine="true")
print(f" ... updating {access} datasets to {expected} ...")
for dataset in datasets:
response = server_client.update(
dataset.resource_id, access=expected, raise_error=False
)
print(f"\t ... updating {dataset.name} to {access!r}")
if response.ok:
assert not dataset.metadata["server.archiveonly"]
meta = server_client.get_metadata(
dataset.resource_id, metadata="dataset.access"
)
assert meta["dataset.access"] == expected
else:
assert dataset.metadata[
"server.archiveonly"
], f"Indexed dataset {dataset.name} failed to update with {response.json()['message']}"
print(f"\t ... updating {dataset.name} to {expected!r}")
assert (
response.ok
), f"Indexed dataset {dataset.name} failed to update with {response.json()['message']}"
meta = server_client.get_metadata(
dataset.resource_id, metadata="dataset.access"
)
assert meta["dataset.access"] == expected


class TestDelete:
Expand Down
Loading

0 comments on commit ede8230

Please sign in to comment.