Skip to content

Commit

Permalink
Handle duplicate tarball names with distinct MD5 (distributed-system-…
Browse files Browse the repository at this point in the history
…analysis#3552)

* Handle duplicate tarball names with distinct MD5

PBENCH-1276

Automated testing can easily launch similar tests simultaneously, resulting in
distinct tests with identical tarball names. Pbench Server has maintained
result data using tarball names: in 0.69, a duplicate name quietly overwrote
the earlier tarball, and recently 1.0 code has explicitly rejected this case.

Because we're seeing this moderately commonly in cloud testing environments,
this PR makes changes to gracefully accept these duplicate names, by storing
the tarballs under an "isolation" directory layer using the MD5 (resource ID)
to avoid duplicate names. The cache tree already has a resource ID layer.

Add a functional test case, using a tarball manually unpacked and repacked
with a `metadata.log` modified to resemble the likely actual differences (node
name and fine-grained timestamp).
  • Loading branch information
dbutenhof authored Sep 18, 2023
1 parent 7166e9f commit 5d8070f
Show file tree
Hide file tree
Showing 9 changed files with 327 additions and 136 deletions.
154 changes: 101 additions & 53 deletions lib/pbench/server/cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,13 @@ class Tarball:
TAR_EXEC_TIMEOUT = 60.0
TAR_EXEC_WAIT = 0.02

def __init__(self, path: Path, controller: "Controller"):
def __init__(self, path: Path, resource_id: str, controller: "Controller"):
"""Construct a `Tarball` object instance
Args:
path: The file path to a discovered tarball (.tar.xz file) in the
configured ARCHIVE directory for a controller.
resource_id: The dataset resource ID
controller: The associated Controller object
"""
self.logger: Logger = controller.logger
Expand All @@ -307,14 +308,20 @@ def __init__(self, path: Path, controller: "Controller"):

# Record the Dataset resource ID (MD5) for coordination with the server
# logic
self.resource_id: str = get_tarball_md5(path)
self.resource_id: str = resource_id

# Record a backlink to the containing controller object
self.controller: Controller = controller

# Record the tarball isolation directory
self.isolator = controller.path / resource_id

# Record the path of the tarball file
self.tarball_path: Path = path

# Record the path of the companion MD5 file
self.md5_path: Path = path.with_suffix(".xz.md5")

# Record where cached unpacked data would live
self.cache: Path = controller.cache / self.resource_id

Expand All @@ -326,9 +333,6 @@ def __init__(self, path: Path, controller: "Controller"):
# inactive.
self.unpacked: Optional[Path] = None

# Record the path of the companion MD5 file
self.md5_path: Path = path.with_suffix(".xz.md5")

# Record the name of the containing controller
self.controller_name: str = controller.name

Expand Down Expand Up @@ -364,11 +368,21 @@ def check_unpacked(self):
# unpacked directory tree.

@classmethod
def create(cls, tarball: Path, controller: "Controller") -> "Tarball":
def create(
cls, tarball: Path, resource_id: str, controller: "Controller"
) -> "Tarball":
"""An alternate constructor to import a tarball
This moves a new tarball into the proper place along with the md5
companion file. It returns the new Tarball object.
Args:
tarball: location of the tarball
resource_id: tarball resource ID
controller: associated controller object
Returns:
Tarball object
"""

# Validate the tarball suffix and extract the dataset name
Expand All @@ -378,8 +392,23 @@ def create(cls, tarball: Path, controller: "Controller") -> "Tarball":
# standard .tar.xz
md5_source = tarball.with_suffix(".xz.md5")

destination = controller.path / tarball.name
md5_destination = controller.path / md5_source.name
# It's possible that two similar benchmark runs at about the same
# time can result in identical filenames with distinct MD5 values
# (for example, the same basic benchmark run on two hosts, which
# has been observed in automated cloud testing). To avoid problems, we
# "isolate" each tarball and its MD5 companion in a subdirectory with
# the md5 (resource_id) string to prevent collisions. The Tarball
# object maintains this, but we need it here, first, to move the
# files.
isolator = controller.path / resource_id

# NOTE: we enable "parents" and "exist_ok" not because we expect these
# conditions (both should be impossible) but because it's not worth an
# extra error check. We'll fail below if either *file* already
# exists in the isolator directory.
isolator.mkdir(parents=True, exist_ok=True)
destination = isolator / tarball.name
md5_destination = isolator / md5_source.name

# If either expected destination file exists, something is wrong
if destination.exists() or md5_destination.exists():
Expand Down Expand Up @@ -436,7 +465,7 @@ def create(cls, tarball: Path, controller: "Controller") -> "Tarball":
except Exception as e:
controller.logger.error("Error removing staged MD5 {}: {}", name, e)

return cls(destination, controller)
return cls(destination, resource_id, controller)

def cache_map(self, dir_path: Path):
"""Builds Hierarchy structure of a Directory in a Dictionary
Expand Down Expand Up @@ -784,20 +813,31 @@ def delete(self):
files. There's nothing more we can do.
"""
self.uncache()
if self.md5_path:
try:
self.md5_path.unlink()
except Exception as e:
self.logger.error("archive unlink for {} failed with {}", self.name, e)
self.md5_path = None
if self.tarball_path:
if self.isolator and self.isolator.exists():
try:
self.tarball_path.unlink()
shutil.rmtree(self.isolator)
except Exception as e:
self.logger.error(
"archive MD5 unlink for {} failed with {}", self.name, e
)
self.logger.error("isolator delete for {} failed with {}", self.name, e)
self.isolator = None
self.tarball_path = None
self.md5_path = None
else:
if self.md5_path:
try:
self.md5_path.unlink()
except Exception as e:
self.logger.error(
"archive unlink for {} failed with {}", self.name, e
)
self.md5_path = None
if self.tarball_path:
try:
self.tarball_path.unlink()
except Exception as e:
self.logger.error(
"archive MD5 unlink for {} failed with {}", self.name, e
)
self.tarball_path = None


class Controller:
Expand All @@ -808,21 +848,6 @@ class Controller:
but the audit report generator will flag it.
"""

@staticmethod
def delete_if_empty(directory: Path) -> None:
"""Delete a directory only if it exists and is empty.
NOTE: rmdir technically will fail if the directory isn't empty, but
this feels safer.
Any exceptions raised will be propagated.
Args:
directory: Directory path
"""
if directory.exists() and not any(directory.iterdir()):
directory.rmdir()

def __init__(self, path: Path, cache: Path, logger: Logger):
"""Manage the representation of a controller archive on disk.
Expand Down Expand Up @@ -857,20 +882,37 @@ def __init__(self, path: Path, cache: Path, logger: Logger):
# constructor!
self._discover_tarballs()

def _add_if_tarball(self, file: Path, md5: Optional[str] = None):
"""Check for a tar file, and create an object
Args:
file: path of potential tarball
md5: known MD5 hash, or None to compute here
"""
if file.is_file() and Dataset.is_tarball(file):
hash = md5 if md5 else get_tarball_md5(file)
tarball = Tarball(file, hash, self)
self.tarballs[tarball.name] = tarball
self.datasets[tarball.resource_id] = tarball
tarball.check_unpacked()

def _discover_tarballs(self):
"""Discover the known tarballs
Look in the ARCHIVE tree's controller directory for tarballs, and add
them to the known set. We also check for unpacked directories in the
CACHE tree matching the resource_id of any tarballs we find in order
to link them.
them to the known set. "Old" tarballs may be at the top level, "new"
tarballs are in "resource_id" isolation directories.
We also check for unpacked directories in the CACHE tree matching the
resource_id of any tarballs we find in order to link them.
"""
for file in self.path.iterdir():
if file.is_file() and Dataset.is_tarball(file):
tarball = Tarball(file, self)
self.tarballs[tarball.name] = tarball
self.datasets[tarball.resource_id] = tarball
tarball.check_unpacked()
if file.is_dir():
md5 = file.name
for tar in file.iterdir():
self._add_if_tarball(tar, md5)
else:
self._add_if_tarball(file)

@classmethod
def create(
Expand All @@ -896,7 +938,7 @@ def create_tarball(self, tarfile_path: Path) -> Tarball:
Returns:
Tarball object
"""
tarball = Tarball.create(tarfile_path, self)
tarball = Tarball.create(tarfile_path, get_tarball_md5(tarfile_path), self)
self.datasets[tarball.resource_id] = tarball
self.tarballs[tarball.name] = tarball
return tarball
Expand Down Expand Up @@ -1023,7 +1065,7 @@ def __init__(self, options: PbenchServerConfig, logger: Logger):
# resource_id.
self.datasets: dict[str, Tarball] = {}

def full_discovery(self):
def full_discovery(self) -> "CacheManager":
"""Discover the ARCHIVE and CACHE trees
NOTE: both _discover_unpacked() and _discover_results() rely on the
Expand All @@ -1033,6 +1075,7 @@ def full_discovery(self):
specific dataset.
"""
self._discover_controllers()
return self

def __contains__(self, dataset_id: str) -> bool:
"""Determine whether the cache manager includes a dataset.
Expand Down Expand Up @@ -1066,7 +1109,12 @@ def _clean_empties(self, controller: str):
controller: Name of the controller to clean up
"""
archive = self.options.ARCHIVE / controller
if archive.exists() and not any(archive.glob(f"*{Dataset.TARBALL_SUFFIX}")):

# Look for tarballs in the controller or subdirectory. (NOTE: this
# provides compatibility with tarballs in an "isolator" subdirectory
# and older tarballs without isolators by using the recursive "**/"
# glob syntax.)
if archive.exists() and not any(archive.glob(f"**/*{Dataset.TARBALL_SUFFIX}")):
self.delete_if_empty(archive)
del self.controllers[controller]

Expand Down Expand Up @@ -1125,7 +1173,7 @@ def find_dataset(self, dataset_id: str) -> Tarball:
# and (if found) discover the controller containing that dataset.
for dir_entry in self.archive_root.iterdir():
if dir_entry.is_dir() and dir_entry.name != self.TEMPORARY:
for file in dir_entry.glob(f"*{Dataset.TARBALL_SUFFIX}"):
for file in dir_entry.glob(f"**/*{Dataset.TARBALL_SUFFIX}"):
md5 = get_tarball_md5(file)
if md5 == dataset_id:
self._add_controller(dir_entry)
Expand Down Expand Up @@ -1254,7 +1302,6 @@ def uncache(self, dataset_id: str):
tarball = self.find_dataset(dataset_id)
controller = tarball.controller
controller.uncache(dataset_id)
self._clean_empties(controller.name)

def delete(self, dataset_id: str):
"""Delete the dataset as well as unpacked artifacts.
Expand All @@ -1264,10 +1311,11 @@ def delete(self, dataset_id: str):
"""
try:
tarball = self.find_dataset(dataset_id)
name = tarball.name
tarball.controller.delete(dataset_id)
del self.datasets[dataset_id]
del self.tarballs[name]
self._clean_empties(tarball.controller_name)
except TarballNotFound:
return
name = tarball.name
tarball.controller.delete(dataset_id)
del self.datasets[dataset_id]
del self.tarballs[name]

self._clean_empties(tarball.controller_name)
20 changes: 10 additions & 10 deletions lib/pbench/server/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
UnsupportedTarballFormat,
)
import pbench.server
from pbench.server.cache_manager import Tarball
from pbench.server.database.models.datasets import Dataset
from pbench.server.database.models.index_map import IndexMapType
from pbench.server.templates import PbenchTemplates
Expand Down Expand Up @@ -3190,23 +3191,22 @@ def __init__(
self,
idxctx: "IdxContext",
dataset: Dataset,
tbarg: str,
tmpdir: str,
extracted_root: str,
tarobj: Tarball,
):
"""Context for indexing a tarball.
Args:
idxctx: The controlling Elasticsearch (pyesbulk) indexing context
dataset: The Dataset object representing the tarball
tbarg: The filesystem path to the tarball (as a string)
tmpdir: The path to a temporary directory (as a string)
extracted_root: The path to the extracted tarball data (as a string)
tarobj: The cache manager tarball object
"""
self.idxctx = idxctx
self.authorization = {"owner": str(dataset.owner_id), "access": dataset.access}
self.tbname = tbarg
self.controller_dir = os.path.basename(os.path.dirname(self.tbname))
self.tarobj = tarobj
self.tbname = str(tarobj.tarball_path)
self.controller_dir = tarobj.controller.name
try:
self.satellite, self.controller_name = self.controller_dir.split("::", 1)
except Exception:
Expand Down Expand Up @@ -3265,18 +3265,18 @@ def __init__(
'{} - tar ball is missing "{}".'.format(self.tbname, metadata_log_path)
)

self.extracted_root = extracted_root
self.extracted_root = tarobj.cache
if not os.path.isdir(os.path.join(self.extracted_root, self.dirname)):
raise UnsupportedTarballFormat(
'{} - extracted tar ball directory "{}" does not'
" exist.".format(
self.tbname, os.path.join(self.extracted_root, self.dirname)
)
)
# Open the MD5 file of the tar ball and read the MD5 sum from it.
md5sum = open("%s.md5" % (self.tbname)).read().split()[0]

# Construct the @metadata and run metadata dictionaries from the
# metadata.log file.
md5sum = dataset.resource_id
self.mdconf = MetadataLog()
mdf = os.path.join(self.extracted_root, metadata_log_path)
try:
Expand Down Expand Up @@ -3429,7 +3429,7 @@ def __init__(
# using the controller directory, tar ball name (not path), and its
# MD5 value so that warnings, errors, and exceptions can have
# additional context to add.
self._tbctx = f"{self.controller_dir}/{os.path.basename(tbarg)}({md5sum})"
self._tbctx = f"{self.controller_dir}/{tarobj.name}({md5sum})"

def map_document(self, root_idx: str, index: str, id: str) -> None:
"""
Expand Down
5 changes: 1 addition & 4 deletions lib/pbench/server/indexing_tarballs.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,6 @@ def sighup_handler(*args):
tarobj: Optional[Tarball] = None
tb_res = error_code["OK"]
try:
path = os.path.realpath(tb)

# Dynamically unpack the tarball for indexing.
try:
tarobj = self.cache_manager.unpack(dataset.resource_id)
Expand All @@ -374,7 +372,6 @@ def sighup_handler(*args):
"{} has not been unpacked", dataset
)
continue
unpacked = tarobj.cache # The indexer needs the root
except TarballNotFound as e:
self.sync.error(
dataset,
Expand All @@ -392,7 +389,7 @@ def sighup_handler(*args):

# "Open" the tar ball represented by the tar ball object
idxctx.logger.debug("open tar ball")
ptb = PbenchTarBall(idxctx, dataset, path, tmpdir, unpacked)
ptb = PbenchTarBall(idxctx, dataset, tmpdir, tarobj)

# Construct the generator for emitting all actions.
# The `idxctx` dictionary is passed along to each
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
08516cc7448035be2cc502f0517783fa fio_rw_2018.02.01T22.40.57.tar.xz
Loading

0 comments on commit 5d8070f

Please sign in to comment.