Skip to content

Commit

Permalink
Handle ENOSPC more cleanly (distributed-system-analysis#3513)
Browse files Browse the repository at this point in the history
* Handle ENOSPC more cleanly

PBENCH-1239

In Pbench ops review, after staging the latest `main` and with the intent of
testing the new `Tarball.extract` on a large dataset, we pushed a >7Gb `uperf`
tarball. This failed with an internal error, leaving a partial tarball copy in
the `ARCHIVE` controller directory, revealing several related problems:
1. The cache manager was using `shlib.copy2`, which copies the tarball from
the staging area into the archive tree. Because `nginx` also caches the entire
data stream, this *triples* the storage requirements to upload a tarball.
2. On copy failure, the cache manager did not delete the partial file.
3. While the initial data stream save code handled an `ENOSPC` specially, after
mapping trouble in Werkzeug it was reported as a "server internal error",
which is not ideal.
4. The MD5 file write was not similarly protected: and while this is a small
file and `ENOSPC` is unlikely, we should be prepared to handle it gracefully.

This PR changes the cache manager to use `shlib.move` (which was the original
intent) to avoid a third copy of the tarball. On failure, we unlink the file.
Both the initial tarball and MD5 write handle `ENOSPC` and return HTTP status
413 (request entity too large), which is not a perfect mapping but a standard
error code that Werkzeug can handle.
  • Loading branch information
dbutenhof authored Aug 7, 2023
1 parent 9b071de commit b4ef1dd
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 48 deletions.
38 changes: 28 additions & 10 deletions lib/pbench/server/api/resources/intake_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,17 +356,17 @@ def _intake(
ofp.write(chunk)
hash_md5.update(chunk)
except OSError as exc:
# NOTE: Werkzeug doesn't support status 509, so the abort call
# in _dispatch will fail. Rather than figure out how to fix
# that, just report as an internal error.
if exc.errno == errno.ENOSPC:
msg = f"Out of space on {tar_full_path.root}"
else:
msg = f"Unexpected error {exc.errno} encountered during file upload"
raise APIInternalError(msg) from exc
raise APIAbort(
HTTPStatus.REQUEST_ENTITY_TOO_LARGE,
f"Out of space on {tar_full_path.parent}",
)
raise APIInternalError(
f"Unexpected error encountered during file upload: {str(exc)!r} "
) from exc
except Exception as e:
raise APIInternalError(
"Unexpected error encountered during file upload"
"Unexpected error encountered during file upload: {str(e)!r}"
) from e

if bytes_received != stream.length:
Expand All @@ -381,12 +381,21 @@ def _intake(
)

# From this point attempt to remove the MD5 file on error exit
recovery.add(md5_full_path.unlink)
recovery.add(lambda: md5_full_path.unlink(missing_ok=True))
try:
md5_full_path.write_text(f"{intake.md5} {filename}\n")
except OSError as exc:
if exc.errno == errno.ENOSPC:
raise APIAbort(
HTTPStatus.REQUEST_ENTITY_TOO_LARGE,
f"Out of space on {md5_full_path.parent}",
)
raise APIInternalError(
f"Unexpected error encountered during MD5 creation: {str(exc)!r}"
) from exc
except Exception as e:
raise APIInternalError(
f"Failed to write .md5 file '{md5_full_path}'",
f"Failed to write .md5 file '{md5_full_path}': {str(e)!r}",
) from e

# Create a cache manager object
Expand All @@ -408,6 +417,15 @@ def _intake(
HTTPStatus.BAD_REQUEST,
f"Tarball {dataset.name!r} is invalid or missing required metadata.log: {exc}",
) from exc
except OSError as exc:
if exc.errno == errno.ENOSPC:
raise APIAbort(
HTTPStatus.REQUEST_ENTITY_TOO_LARGE,
f"Out of space on {tar_full_path.parent}",
)
raise APIInternalError(
f"Unexpected error encountered during archive: {str(exc)!r}"
) from exc
except Exception as exc:
raise APIInternalError(
f"Unable to create dataset in file system for {tar_full_path}: {exc}"
Expand Down
38 changes: 24 additions & 14 deletions lib/pbench/server/cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,35 +378,40 @@ def create(cls, tarball: Path, controller: "Controller") -> "Tarball":
# standard .tar.xz
md5_source = tarball.with_suffix(".xz.md5")

destination = controller.path / tarball.name
md5_destination = controller.path / md5_source.name

# If either expected destination file exists, something is wrong
if (controller.path / tarball.name).exists():
raise DuplicateTarball(name)
if (controller.path / md5_source.name).exists():
if destination.exists() or md5_destination.exists():
raise DuplicateTarball(name)

# Copy the MD5 file first; only if that succeeds, copy the tarball
# itself.
# Move the MD5 file first; only if that succeeds, move the tarball
# itself. Note that we expect the source to be on the same
# filesystem as the ARCHIVE tree, and we want to avoid using double
# the space by copying large tarballs if the file can be moved.
try:
md5_destination = Path(shutil.copy2(md5_source, controller.path))
shutil.move(md5_source, md5_destination)
except Exception as e:
md5_destination.unlink(missing_ok=True)
controller.logger.error(
"ERROR copying dataset {} ({}) MD5: {}", name, tarball, e
"ERROR moving dataset {} ({}) MD5: {}", name, tarball, e
)
raise

try:
destination = Path(shutil.copy2(tarball, controller.path))
shutil.move(tarball, destination)
except Exception as e:
try:
md5_destination.unlink()
md5_destination.unlink(missing_ok=True)
except Exception as md5_e:
controller.logger.error(
"Unable to recover by removing {} MD5 after tarball copy failure: {}",
name,
md5_e,
)
destination.unlink(missing_ok=True)
controller.logger.error(
"ERROR copying dataset {} tarball {}: {}", name, tarball, e
"ERROR moving dataset {} tarball {}: {}", name, tarball, e
)
raise

Expand All @@ -419,12 +424,17 @@ def create(cls, tarball: Path, controller: "Controller") -> "Tarball":
# log it but do not abort
controller.logger.error("Unable to set SELINUX context for {}: {}", name, e)

# If we were able to copy both files, remove the originals
# If we were able to copy both files, remove the originals. If we moved
# the files above, instead of copying them, these will no longer exist
# and we'll ignore that condition silently.
try:
tarball.unlink(missing_ok=True)
except Exception as e:
controller.logger.error("Error removing staged tarball {}: {}", name, e)
try:
tarball.unlink()
md5_source.unlink()
md5_source.unlink(missing_ok=True)
except Exception as e:
controller.logger.error("Error removing incoming dataset {}: {}", name, e)
controller.logger.error("Error removing staged MD5 {}: {}", name, e)

return cls(destination, controller)

Expand Down
74 changes: 70 additions & 4 deletions lib/pbench/test/unit/server/test_cache_manager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import errno
import hashlib
import io
from logging import Logger
Expand Down Expand Up @@ -244,17 +245,82 @@ def test_duplicate(
monkeypatch.setattr(Tarball, "_get_metadata", fake_get_metadata)
cm = CacheManager(server_config, make_logger)

# Create a tarball file in the expected destination directory
# Create a tarball file in the expected destination directory: the
# subsequent create should report a duplicate.
controller = cm.archive_root / "ABC"
controller.mkdir()
(controller / source_tarball.name).write_text("Send in the clones")

# Attempting to create a dataset from the md5 file should result in
# a duplicate dataset error
with pytest.raises(DuplicateTarball) as exc:
cm.create(source_tarball)
assert exc.value.tarball == Dataset.stem(source_tarball)

@pytest.mark.parametrize(
"allow,errno",
(
(".md5", errno.ENOSPC),
(".md5", errno.EEXIST),
(".md5", None),
("", errno.ENOSPC),
("", errno.EACCES),
("", None),
),
)
def test_move_fails(
self,
monkeypatch,
selinux_disabled,
server_config,
make_logger,
tarball,
allow,
errno,
):
src: list[Path] = []
dest: list[Path] = []
real_move = shutil.move

def mymove(source: Path, destination: Path, *args, **kwargs) -> Path:
src.append(source)
if destination.is_dir():
d = destination / source.name
else:
d = destination
dest.append(d)
if source.suffix == allow:
return real_move(source, destination, *args, **kwargs)
if errno:
e = OSError(errno, "something went badly")
else:
e = Exception("I broke")
raise e

ulink: list[Path] = []
ok: list[bool] = []

def unlink(self, missing_ok: bool = False):
ulink.append(self)
ok.append(missing_ok)

source_tarball, source_md5, md5 = tarball
monkeypatch.setattr(Tarball, "_get_metadata", fake_get_metadata)
cm = CacheManager(server_config, make_logger)
monkeypatch.setattr("pbench.server.cache_manager.shutil.move", mymove)
monkeypatch.setattr(Path, "unlink", unlink)
with pytest.raises(Exception) as e:
cm.create(source_tarball)
if errno:
assert isinstance(e.value, OSError), f"Wanted OSError, got {type(e.value)}"
assert e.value.errno == errno
else:
assert str(e.value) == "I broke"
assert src == [source_md5] + ([source_tarball] if allow else [])
assert len(src) == len(dest) == len(ulink) == len(ok) == (2 if allow else 1)
for i, s in enumerate(src):
assert dest[i].name == s.name
assert ulink[i].name == s.name
assert dest[i] == ulink[i]
assert all(ok), f"Cleanup unlinks don't ignore missing: {ok}, {ulink}"

def test_tarball_subprocess_run_with_exception(self, monkeypatch):
"""Test to check the subprocess_run functionality of the Tarball when
an Exception occurred"""
Expand Down
Loading

0 comments on commit b4ef1dd

Please sign in to comment.