From b4ef1dd66b047222dab6c7fcb1033f1714c4dced Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Mon, 7 Aug 2023 17:23:32 -0400 Subject: [PATCH] Handle ENOSPC more cleanly (#3513) * Handle ENOSPC more cleanly PBENCH-1239 In Pbench ops review, after staging the latest `main` and with the intent of testing the new `Tarball.extract` on a large dataset, we pushed a >7Gb `uperf` tarball. This failed with an internal error, leaving a partial tarball copy in the `ARCHIVE` controller directory, revealing several related problems: 1. The cache manager was using `shlib.copy2`, which copies the tarball from the staging area into the archive tree. Because `nginx` also caches the entire data stream, this *triples* the storage requirements to upload a tarball. 2. On copy failure, the cache manager did not delete the partial file. 3. While the initial data stream save code handled an `ENOSPC` specially, after mapping trouble in Werkzeug it was reported as a "server internal error", which is not ideal. 4. The MD5 file write was not similarly protected: and while this is a small file and `ENOSPC` is unlikely, we should be prepared to handle it gracefully. This PR changes the cache manager to use `shlib.move` (which was the original intent) to avoid a third copy of the tarball. On failure, we unlink the file. Both the initial tarball and MD5 write handle `ENOSPC` and return HTTP status 413 (request entity too large), which is not a perfect mapping but a standard error code that Werkzeug can handle. --- .../server/api/resources/intake_base.py | 38 +++-- lib/pbench/server/cache_manager.py | 38 +++-- .../test/unit/server/test_cache_manager.py | 74 ++++++++- lib/pbench/test/unit/server/test_upload.py | 157 +++++++++++++++--- 4 files changed, 259 insertions(+), 48 deletions(-) diff --git a/lib/pbench/server/api/resources/intake_base.py b/lib/pbench/server/api/resources/intake_base.py index c7284bf797..21ae865a66 100644 --- a/lib/pbench/server/api/resources/intake_base.py +++ b/lib/pbench/server/api/resources/intake_base.py @@ -356,17 +356,17 @@ def _intake( ofp.write(chunk) hash_md5.update(chunk) except OSError as exc: - # NOTE: Werkzeug doesn't support status 509, so the abort call - # in _dispatch will fail. Rather than figure out how to fix - # that, just report as an internal error. if exc.errno == errno.ENOSPC: - msg = f"Out of space on {tar_full_path.root}" - else: - msg = f"Unexpected error {exc.errno} encountered during file upload" - raise APIInternalError(msg) from exc + raise APIAbort( + HTTPStatus.REQUEST_ENTITY_TOO_LARGE, + f"Out of space on {tar_full_path.parent}", + ) + raise APIInternalError( + f"Unexpected error encountered during file upload: {str(exc)!r} " + ) from exc except Exception as e: raise APIInternalError( - "Unexpected error encountered during file upload" + "Unexpected error encountered during file upload: {str(e)!r}" ) from e if bytes_received != stream.length: @@ -381,12 +381,21 @@ def _intake( ) # From this point attempt to remove the MD5 file on error exit - recovery.add(md5_full_path.unlink) + recovery.add(lambda: md5_full_path.unlink(missing_ok=True)) try: md5_full_path.write_text(f"{intake.md5} {filename}\n") + except OSError as exc: + if exc.errno == errno.ENOSPC: + raise APIAbort( + HTTPStatus.REQUEST_ENTITY_TOO_LARGE, + f"Out of space on {md5_full_path.parent}", + ) + raise APIInternalError( + f"Unexpected error encountered during MD5 creation: {str(exc)!r}" + ) from exc except Exception as e: raise APIInternalError( - f"Failed to write .md5 file '{md5_full_path}'", + f"Failed to write .md5 file '{md5_full_path}': {str(e)!r}", ) from e # Create a cache manager object @@ -408,6 +417,15 @@ def _intake( HTTPStatus.BAD_REQUEST, f"Tarball {dataset.name!r} is invalid or missing required metadata.log: {exc}", ) from exc + except OSError as exc: + if exc.errno == errno.ENOSPC: + raise APIAbort( + HTTPStatus.REQUEST_ENTITY_TOO_LARGE, + f"Out of space on {tar_full_path.parent}", + ) + raise APIInternalError( + f"Unexpected error encountered during archive: {str(exc)!r}" + ) from exc except Exception as exc: raise APIInternalError( f"Unable to create dataset in file system for {tar_full_path}: {exc}" diff --git a/lib/pbench/server/cache_manager.py b/lib/pbench/server/cache_manager.py index 8de5ff964b..8382460362 100644 --- a/lib/pbench/server/cache_manager.py +++ b/lib/pbench/server/cache_manager.py @@ -378,35 +378,40 @@ def create(cls, tarball: Path, controller: "Controller") -> "Tarball": # standard .tar.xz md5_source = tarball.with_suffix(".xz.md5") + destination = controller.path / tarball.name + md5_destination = controller.path / md5_source.name + # If either expected destination file exists, something is wrong - if (controller.path / tarball.name).exists(): - raise DuplicateTarball(name) - if (controller.path / md5_source.name).exists(): + if destination.exists() or md5_destination.exists(): raise DuplicateTarball(name) - # Copy the MD5 file first; only if that succeeds, copy the tarball - # itself. + # Move the MD5 file first; only if that succeeds, move the tarball + # itself. Note that we expect the source to be on the same + # filesystem as the ARCHIVE tree, and we want to avoid using double + # the space by copying large tarballs if the file can be moved. try: - md5_destination = Path(shutil.copy2(md5_source, controller.path)) + shutil.move(md5_source, md5_destination) except Exception as e: + md5_destination.unlink(missing_ok=True) controller.logger.error( - "ERROR copying dataset {} ({}) MD5: {}", name, tarball, e + "ERROR moving dataset {} ({}) MD5: {}", name, tarball, e ) raise try: - destination = Path(shutil.copy2(tarball, controller.path)) + shutil.move(tarball, destination) except Exception as e: try: - md5_destination.unlink() + md5_destination.unlink(missing_ok=True) except Exception as md5_e: controller.logger.error( "Unable to recover by removing {} MD5 after tarball copy failure: {}", name, md5_e, ) + destination.unlink(missing_ok=True) controller.logger.error( - "ERROR copying dataset {} tarball {}: {}", name, tarball, e + "ERROR moving dataset {} tarball {}: {}", name, tarball, e ) raise @@ -419,12 +424,17 @@ def create(cls, tarball: Path, controller: "Controller") -> "Tarball": # log it but do not abort controller.logger.error("Unable to set SELINUX context for {}: {}", name, e) - # If we were able to copy both files, remove the originals + # If we were able to copy both files, remove the originals. If we moved + # the files above, instead of copying them, these will no longer exist + # and we'll ignore that condition silently. + try: + tarball.unlink(missing_ok=True) + except Exception as e: + controller.logger.error("Error removing staged tarball {}: {}", name, e) try: - tarball.unlink() - md5_source.unlink() + md5_source.unlink(missing_ok=True) except Exception as e: - controller.logger.error("Error removing incoming dataset {}: {}", name, e) + controller.logger.error("Error removing staged MD5 {}: {}", name, e) return cls(destination, controller) diff --git a/lib/pbench/test/unit/server/test_cache_manager.py b/lib/pbench/test/unit/server/test_cache_manager.py index d483fa5153..b108f99521 100644 --- a/lib/pbench/test/unit/server/test_cache_manager.py +++ b/lib/pbench/test/unit/server/test_cache_manager.py @@ -1,3 +1,4 @@ +import errno import hashlib import io from logging import Logger @@ -244,17 +245,82 @@ def test_duplicate( monkeypatch.setattr(Tarball, "_get_metadata", fake_get_metadata) cm = CacheManager(server_config, make_logger) - # Create a tarball file in the expected destination directory + # Create a tarball file in the expected destination directory: the + # subsequent create should report a duplicate. controller = cm.archive_root / "ABC" controller.mkdir() (controller / source_tarball.name).write_text("Send in the clones") - - # Attempting to create a dataset from the md5 file should result in - # a duplicate dataset error with pytest.raises(DuplicateTarball) as exc: cm.create(source_tarball) assert exc.value.tarball == Dataset.stem(source_tarball) + @pytest.mark.parametrize( + "allow,errno", + ( + (".md5", errno.ENOSPC), + (".md5", errno.EEXIST), + (".md5", None), + ("", errno.ENOSPC), + ("", errno.EACCES), + ("", None), + ), + ) + def test_move_fails( + self, + monkeypatch, + selinux_disabled, + server_config, + make_logger, + tarball, + allow, + errno, + ): + src: list[Path] = [] + dest: list[Path] = [] + real_move = shutil.move + + def mymove(source: Path, destination: Path, *args, **kwargs) -> Path: + src.append(source) + if destination.is_dir(): + d = destination / source.name + else: + d = destination + dest.append(d) + if source.suffix == allow: + return real_move(source, destination, *args, **kwargs) + if errno: + e = OSError(errno, "something went badly") + else: + e = Exception("I broke") + raise e + + ulink: list[Path] = [] + ok: list[bool] = [] + + def unlink(self, missing_ok: bool = False): + ulink.append(self) + ok.append(missing_ok) + + source_tarball, source_md5, md5 = tarball + monkeypatch.setattr(Tarball, "_get_metadata", fake_get_metadata) + cm = CacheManager(server_config, make_logger) + monkeypatch.setattr("pbench.server.cache_manager.shutil.move", mymove) + monkeypatch.setattr(Path, "unlink", unlink) + with pytest.raises(Exception) as e: + cm.create(source_tarball) + if errno: + assert isinstance(e.value, OSError), f"Wanted OSError, got {type(e.value)}" + assert e.value.errno == errno + else: + assert str(e.value) == "I broke" + assert src == [source_md5] + ([source_tarball] if allow else []) + assert len(src) == len(dest) == len(ulink) == len(ok) == (2 if allow else 1) + for i, s in enumerate(src): + assert dest[i].name == s.name + assert ulink[i].name == s.name + assert dest[i] == ulink[i] + assert all(ok), f"Cleanup unlinks don't ignore missing: {ok}, {ulink}" + def test_tarball_subprocess_run_with_exception(self, monkeypatch): """Test to check the subprocess_run functionality of the Tarball when an Exception occurred""" diff --git a/lib/pbench/test/unit/server/test_upload.py b/lib/pbench/test/unit/server/test_upload.py index bd125ff7bf..01c86e31be 100644 --- a/lib/pbench/test/unit/server/test_upload.py +++ b/lib/pbench/test/unit/server/test_upload.py @@ -22,8 +22,10 @@ Dataset, DatasetNotFound, Metadata, - MetadataProtectedKey, + MetadataMissingParameter, + MetadataSqlError, ) +from pbench.server.database.models.users import User from pbench.test.unit.server import DRB_USER_ID @@ -260,18 +262,32 @@ def test_bad_extension_upload( self.verify_logs(caplog) assert not self.cachemanager_created - @pytest.mark.parametrize("error", (errno.ENOSPC, errno.ENFILE, None)) + @pytest.mark.parametrize( + "error,http_status,message", + ( + (errno.ENOSPC, HTTPStatus.REQUEST_ENTITY_TOO_LARGE, "Out of space on "), + ( + errno.ENFILE, + HTTPStatus.INTERNAL_SERVER_ERROR, + "Internal Pbench Server Error:", + ), + (None, HTTPStatus.INTERNAL_SERVER_ERROR, "Internal Pbench Server Error:"), + ), + ) def test_bad_stream_read( - self, client, server_config, pbench_drb_token, monkeypatch, error + self, + client, + server_config, + pbench_drb_token, + monkeypatch, + error, + http_status, + message, ): """Test handling of errors from the intake stream read - The intake code handles errno.ENOSPC specially; however although the - code tried to raise an APIAbort with HTTPStatus.INSUFFICIENT_SPACE - (50), the werkzeug abort() doesn't support this and ends up with - a generic internal server error. Instead, we now have three distinct - cases which all result (to the client) in identical internal server - errors. Nevertheless, we exercise all three cases here. + The intake code reports errno.ENOSPC with 413/REQUEST_ENTITY_TOO_LARGE, + but other file create errors are reported as 500/INTERNAL_SERVER_ERROR. """ stream = BytesIO(b"12345") @@ -294,10 +310,71 @@ def read(self): data=data_fp, headers=self.gen_headers(pbench_drb_token, "md5sum"), ) - assert response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR - assert response.json.get("message").startswith( - "Internal Pbench Server Error: log reference " - ) + assert response.status_code == http_status + assert response.json.get("message").startswith(message) + + @pytest.mark.parametrize( + "error,http_status,message", + ( + (errno.ENOSPC, HTTPStatus.REQUEST_ENTITY_TOO_LARGE, "Out of space on "), + ( + errno.ENFILE, + HTTPStatus.INTERNAL_SERVER_ERROR, + "Internal Pbench Server Error:", + ), + (None, HTTPStatus.INTERNAL_SERVER_ERROR, "Internal Pbench Server Error:"), + ), + ) + def test_md5_failure( + self, + monkeypatch, + client, + pbench_drb_token, + server_config, + tarball, + error, + http_status, + message, + ): + """Test handling of errors from MD5 file creation. + + The intake code reports errno.ENOSPC with 413/REQUEST_ENTITY_TOO_LARGE, + but other file create errors are reported as 500/INTERNAL_SERVER_ERROR. + """ + path: Optional[Path] = None + + def nogood_write( + self, data: str, encoding: str = None, errors: str = None + ) -> int: + nonlocal path + path = self + if error: + e = OSError(error, "something went badly") + else: + e = Exception("Nobody expects the Spanish Exception") + raise e + + real_unlink = Path.unlink + unlinks = [] + + def record_unlink(self, **kwargs): + unlinks.append(self.name) + real_unlink(self, **kwargs) + + datafile, md5_file, md5 = tarball + monkeypatch.setattr(Path, "write_text", nogood_write) + monkeypatch.setattr(Path, "unlink", record_unlink) + with datafile.open("rb") as data_fp: + response = client.put( + self.gen_uri(server_config, datafile.name), + data=data_fp, + headers=self.gen_headers(pbench_drb_token, md5), + ) + assert path.name == md5_file.name + assert md5_file.name in unlinks + assert datafile.name in unlinks + assert response.status_code == http_status + assert response.json.get("message").startswith(message) def test_invalid_authorization_upload( self, client, caplog, server_config, pbench_drb_token_invalid @@ -391,6 +468,14 @@ def td_exists(self, *args, **kwargs): ( (Exception("Test"), HTTPStatus.INTERNAL_SERVER_ERROR), (DuplicateTarball("x"), HTTPStatus.BAD_REQUEST), + ( + OSError(errno.ENOSPC, "The closet is too small!"), + HTTPStatus.REQUEST_ENTITY_TOO_LARGE, + ), + ( + OSError(errno.EACCES, "Can't get they-ah from he-ah"), + HTTPStatus.INTERNAL_SERVER_ERROR, + ), ), ) def test_upload_cachemanager_error( @@ -583,19 +668,20 @@ def test_upload_duplicate(self, client, server_config, pbench_drb_token, tarball # We didn't get far enough to create a CacheManager assert TestUpload.cachemanager_created is None - def test_upload_metadata_error( + def test_upload_metalog_error( self, client, monkeypatch, server_config, pbench_drb_token, tarball ): - """ - Cause the Metadata.setvalue to fail at the very end of the upload so we - can test recovery handling. + """Test handling of post-intake error recording metalog + + Cause Metadata.create (which creates the "dataset.metalog" namespace) + to fail at the very end of the upload so we can test recovery handling. """ datafile, _, md5 = tarball - def setvalue(dataset: Dataset, key: str, value: Any): - raise MetadataProtectedKey(key) + def create(**kwargs): + raise MetadataMissingParameter("dataset") - monkeypatch.setattr(Metadata, "setvalue", setvalue) + monkeypatch.setattr(Metadata, "create", create) with datafile.open("rb") as data_fp: response = client.put( @@ -646,6 +732,37 @@ def setvalue(dataset: Dataset, key: str, value: Any): .startswith("Internal Pbench Server Error: log reference ") ) + def test_upload_metadata_error( + self, client, monkeypatch, server_config, pbench_drb_token, tarball + ): + """Test handling of post-intake error setting metadata + + Cause Metadata.setvalue to fail. This should be reported in "failures" + without failing the upload. + """ + datafile, _, md5 = tarball + + def setvalue( + dataset: Dataset, key: str, value: Any, user: Optional[User] = None + ): + raise MetadataSqlError("test", dataset, key) + + monkeypatch.setattr(Metadata, "setvalue", setvalue) + + with datafile.open("rb") as data_fp: + response = client.put( + self.gen_uri(server_config, datafile.name), + data=data_fp, + headers=self.gen_headers(pbench_drb_token, md5), + ) + + assert response.status_code == HTTPStatus.CREATED + audit = Audit.query() + assert len(audit) == 2 + fails = audit[1].attributes["failures"] + assert isinstance(fails, dict) + assert fails["server.benchmark"].startswith("Error test ") + @pytest.mark.freeze_time("1970-01-01") def test_upload_archive(self, client, pbench_drb_token, server_config, tarball): """Test a successful archiveonly dataset upload."""