Skip to content

Commit

Permalink
Move index-map out of metadata (distributed-system-analysis#3519)
Browse files Browse the repository at this point in the history
* Move index-map out of metadata

PBENCH-462

This moves the Elasticsearch document index map out of the `server.index-map`
metadata key and gives it its own SQL table. There are more changes than I'd
expected to get this all back together. This refactors the bulk operation to drive a
`stream` generator out of the SQLAlchemy `.all()` query instead of building,
returning, and then traversing a two-level `dict`, and adds a quick `exists` check
and `indices` to return a simple list of index names, again to avoid needing to
process a `dict`.

There's also a utility script to convert existing `server.index-map` metadata into
`IndexMap` DB rows, which can be run one time on an upgrade. We'll probably
do this exactly twice: once for the staging server and then for the production
server.
  • Loading branch information
dbutenhof authored Aug 22, 2023
1 parent cc35a0c commit bbd948e
Show file tree
Hide file tree
Showing 25 changed files with 990 additions and 212 deletions.
4 changes: 0 additions & 4 deletions docs/Server/API/V1/list.md
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,6 @@ will return a JSON document with the keys `config`, `date`, `hostname_f`,
},
"server": {
"deletion": null,
"index-map": {
"container-pbench.v6.run-data.2023-03": null,
"container-pbench.v6.run-toc.2023-03": null
},
"origin": null,
"tarball-path": null
}
Expand Down
4 changes: 2 additions & 2 deletions docs/Server/API/metadata.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ client.

This defines internal Pbench Server management state related to a dataset
that's not inherent to the representation of the user's performance metrics.
These are generally not useful to clients, and some can be large. There are
three values in this namespace that clients can modify:
These are generally not useful to clients, but there are three values in this
namespace that clients can modify:

#### `server.deletion`

Expand Down
20 changes: 9 additions & 11 deletions lib/pbench/server/api/resources/intake_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,14 @@ def _intake(
hash_md5.update(chunk)
except OSError as exc:
if exc.errno == errno.ENOSPC:
raise APIAbort(
HTTPStatus.REQUEST_ENTITY_TOO_LARGE,
f"Out of space on {tar_full_path.parent}",
usage = shutil.disk_usage(tar_full_path.parent)
current_app.logger.error(
"Archive filesystem is {:.3}% full, upload failure {} ({})",
float(usage.used) / float(usage.total) * 100.0,
dataset.name,
humanize.naturalsize(stream.length),
)
raise APIAbort(HTTPStatus.REQUEST_ENTITY_TOO_LARGE, "Out of space")
raise APIInternalError(
f"Unexpected error encountered during file upload: {str(exc)!r} "
) from exc
Expand All @@ -386,10 +390,7 @@ def _intake(
md5_full_path.write_text(f"{intake.md5} {filename}\n")
except OSError as exc:
if exc.errno == errno.ENOSPC:
raise APIAbort(
HTTPStatus.REQUEST_ENTITY_TOO_LARGE,
f"Out of space on {md5_full_path.parent}",
)
raise APIAbort(HTTPStatus.REQUEST_ENTITY_TOO_LARGE, "Out of space")
raise APIInternalError(
f"Unexpected error encountered during MD5 creation: {str(exc)!r}"
) from exc
Expand Down Expand Up @@ -419,10 +420,7 @@ def _intake(
) from exc
except OSError as exc:
if exc.errno == errno.ENOSPC:
raise APIAbort(
HTTPStatus.REQUEST_ENTITY_TOO_LARGE,
f"Out of space on {tar_full_path.parent}",
)
raise APIAbort(HTTPStatus.REQUEST_ENTITY_TOO_LARGE, "Out of space")
raise APIInternalError(
f"Unexpected error encountered during archive: {str(exc)!r}"
) from exc
Expand Down
21 changes: 12 additions & 9 deletions lib/pbench/server/api/resources/query_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from dateutil import rrule
from dateutil.relativedelta import relativedelta
from elasticsearch import Elasticsearch, helpers, VERSION
from elasticsearch import Elasticsearch, helpers
from flask import current_app, jsonify
from flask.wrappers import Response
import requests
Expand Down Expand Up @@ -40,6 +40,7 @@
Operation,
OperationState,
)
from pbench.server.database.models.index_map import IndexMap, IndexStream
from pbench.server.database.models.templates import Template
from pbench.server.database.models.users import User

Expand Down Expand Up @@ -572,7 +573,7 @@ def generate_actions(
params: ApiParams,
dataset: Dataset,
context: ApiContext,
map: dict[str, list[str]],
map: Iterator[IndexStream],
) -> Iterator[dict]:
"""
Generate a series of Elasticsearch bulk operation actions driven by the
Expand All @@ -591,7 +592,7 @@ def generate_actions(
params: Type-normalized client request body JSON
dataset: The associated Dataset object
context: The operation's ApiContext
map: Elasticsearch index document map
map: Elasticsearch index document map generator
Returns:
Sequence of Elasticsearch bulk action dict objects
Expand Down Expand Up @@ -773,14 +774,12 @@ def _bulk_dispatch(
f"Dataset is working on {operation.name.name}",
)

map = Metadata.getvalue(dataset=dataset, key=Metadata.INDEX_MAP)

# If we don't have an Elasticsearch index-map, then the dataset isn't
# If we don't have an Elasticsearch index map, then the dataset isn't
# indexed and we skip the Elasticsearch actions.
if map:
if IndexMap.exists(dataset):
# Build an Elasticsearch instance to manage the bulk update
elastic = Elasticsearch(self.elastic_uri)
current_app.logger.info("Elasticsearch {} [{}]", elastic, VERSION)
map = IndexMap.stream(dataset=dataset)

# NOTE: because both generate_actions and streaming_bulk return
# generators, the entire sequence is inside a single try block.
Expand All @@ -798,7 +797,11 @@ def _bulk_dispatch(
raise APIAbort(e.http_status, str(e))
except Exception as e:
raise APIInternalError("Unexpected backend error") from e
elif context["attributes"].require_map:
elif context["attributes"].require_map and not Metadata.getvalue(
dataset, Metadata.SERVER_ARCHIVE
):
# If the dataset has no index map, the bulk operation requires one,
# and the dataset isn't marked "archive only", fail.
raise APIAbort(
HTTPStatus.CONFLICT,
f"Operation unavailable: dataset {dataset.resource_id} is not indexed.",
Expand Down
16 changes: 8 additions & 8 deletions lib/pbench/server/api/resources/query_apis/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
OperationName,
OperationState,
)
from pbench.server.database.models.index_map import IndexStream
from pbench.server.sync import Sync


Expand Down Expand Up @@ -83,7 +84,7 @@ def generate_actions(
params: ApiParams,
dataset: Dataset,
context: ApiContext,
map: dict[str, list[str]],
map: Iterator[IndexStream],
) -> Iterator[dict]:
"""
Generate a series of Elasticsearch bulk update actions driven by the
Expand All @@ -93,7 +94,7 @@ def generate_actions(
params: API parameters
dataset: the Dataset object
context: CONTEXT to pass to complete
map: Elasticsearch index document map
map: Elasticsearch index document map generator
Returns:
A generator for Elasticsearch bulk update actions
Expand Down Expand Up @@ -127,12 +128,11 @@ def generate_actions(
# the "access" and/or "owner" field(s) of the "authorization" subdocument:
# no other data will be modified.

for index, ids in map.items():
for id in ids:
es_action = {"_op_type": action, "_index": index, "_id": id}
if es_doc:
es_action["doc"] = es_doc
yield es_action
for i in map:
es_action = {"_op_type": action, "_index": i.index, "_id": i.id}
if es_doc:
es_action["doc"] = es_doc
yield es_action

def complete(
self, dataset: Dataset, context: ApiContext, summary: JSONOBJECT
Expand Down
37 changes: 16 additions & 21 deletions lib/pbench/server/api/resources/query_apis/datasets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
from http import HTTPStatus
from typing import AnyStr, List, NoReturn, Union

from flask import current_app

from pbench.server import JSON, PbenchServerConfig
from pbench.server.api.resources import (
APIAbort,
ApiAuthorizationType,
ApiContext,
APIInternalError,
ApiParams,
ApiSchema,
ParamType,
SchemaError,
)
from pbench.server.api.resources.query_apis import ElasticBase
from pbench.server.database.models.datasets import Dataset, Metadata, MetadataError
from pbench.server.database.models.datasets import Dataset, Metadata
from pbench.server.database.models.index_map import IndexMap
from pbench.server.database.models.templates import Template


Expand Down Expand Up @@ -114,9 +112,19 @@ def preprocess(self, params: ApiParams, context: ApiContext) -> NoReturn:
)
context["dataset"] = dataset

def get_index(self, dataset: Dataset, root_index_name: AnyStr) -> AnyStr:
"""Retrieve the list of ES indices from the metadata table based on a
given root_index_name.
def get_index(self, dataset: Dataset, root_index_name: str) -> str:
"""Retrieve ES indices based on a given root_index_name.
Args:
dataset: dataset object
root_index_name: A root index name like "run-data"
Raises:
APIAbort(CONFLICT) if indexing was disabled on the target dataset.
Returns:
A string that joins all selected indices with ",", suitable for use
in an Elasticsearch query URI.
"""

# Datasets marked "archiveonly" aren't indexed, and can't be referenced
Expand All @@ -125,21 +133,8 @@ def get_index(self, dataset: Dataset, root_index_name: AnyStr) -> AnyStr:
if Metadata.getvalue(dataset, Metadata.SERVER_ARCHIVE):
raise APIAbort(HTTPStatus.CONFLICT, "Dataset indexing was disabled")

try:
index_map = Metadata.getvalue(dataset=dataset, key=Metadata.INDEX_MAP)
except MetadataError as exc:
raise APIInternalError(
f"Required metadata {Metadata.INDEX_MAP} missing"
) from exc

if index_map is None:
raise APIInternalError(
f"Required metadata {Metadata.INDEX_MAP} has no value"
)

index_keys = [key for key in index_map if root_index_name in key]
index_keys = IndexMap.indices(dataset, root_index_name)
indices = ",".join(index_keys)
current_app.logger.debug(f"Indices from metadata , {indices!r}")
return indices

def get_aggregatable_fields(
Expand Down
1 change: 1 addition & 0 deletions lib/pbench/server/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pbench.server.database.models.api_keys import APIKey # noqa F401
from pbench.server.database.models.audit import Audit # noqa F401
from pbench.server.database.models.datasets import Dataset, Metadata # noqa F401
from pbench.server.database.models.index_map import IndexMap # noqa F401
from pbench.server.database.models.server_settings import ServerSetting # noqa F401
from pbench.server.database.models.templates import Template # noqa F401
from pbench.server.database.models.users import User # noqa F401
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Separate index map from metadata
Revision ID: 313cfbf6e74b
Revises: 1a91bc68d6de
Create Date: 2023-08-10 20:31:22.937542
"""
from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = "313cfbf6e74b"
down_revision = "1a91bc68d6de"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.create_table(
"indexmaps",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("dataset_ref", sa.Integer(), nullable=False),
sa.Column("root", sa.String(length=255), nullable=False),
sa.Column("index", sa.String(length=255), nullable=False),
sa.Column("documents", sa.JSON(), nullable=False),
sa.ForeignKeyConstraint(
["dataset_ref"],
["datasets.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f("ix_indexmaps_index"), "indexmaps", ["index"], unique=False)
op.create_index(op.f("ix_indexmaps_root"), "indexmaps", ["root"], unique=False)


def downgrade() -> None:
op.drop_index(op.f("ix_indexmaps_root"), table_name="indexmaps")
op.drop_index(op.f("ix_indexmaps_index"), table_name="indexmaps")
op.drop_table("indexmaps")
14 changes: 3 additions & 11 deletions lib/pbench/server/database/models/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ class Dataset(Database.Base):
operations = relationship(
"Operation", back_populates="dataset", cascade="all, delete-orphan"
)
indices = relationship(
"IndexMap", back_populates="dataset", cascade="all, delete-orphan"
)

TARBALL_SUFFIX = ".tar.xz"

Expand Down Expand Up @@ -641,17 +644,6 @@ class Metadata(Database.Base):
# }
TARBALL_PATH = "server.tarball-path"

# INDEX_MAP a dict recording the set of MD5 document IDs for each
# Elasticsearch index that contains documents for this dataset.
#
# {
# "server.index-map": {
# "drb.v6.run-data.2021-07": ["MD5"],
# "drb.v6.run-toc.2021-07": ["MD5-1", "MD5-2"]
# }
# }
INDEX_MAP = "server.index-map"

# --- Standard Metadata keys

# Metadata keys that clients can update
Expand Down
Loading

0 comments on commit bbd948e

Please sign in to comment.