diff --git a/lib/pbench/server/cache_manager.py b/lib/pbench/server/cache_manager.py index 92bf8104f7..17ad65bb51 100644 --- a/lib/pbench/server/cache_manager.py +++ b/lib/pbench/server/cache_manager.py @@ -292,12 +292,13 @@ class Tarball: TAR_EXEC_TIMEOUT = 60.0 TAR_EXEC_WAIT = 0.02 - def __init__(self, path: Path, controller: "Controller"): + def __init__(self, path: Path, resource_id: str, controller: "Controller"): """Construct a `Tarball` object instance Args: path: The file path to a discovered tarball (.tar.xz file) in the configured ARCHIVE directory for a controller. + resource_id: The dataset resource ID controller: The associated Controller object """ self.logger: Logger = controller.logger @@ -307,14 +308,20 @@ def __init__(self, path: Path, controller: "Controller"): # Record the Dataset resource ID (MD5) for coordination with the server # logic - self.resource_id: str = get_tarball_md5(path) + self.resource_id: str = resource_id # Record a backlink to the containing controller object self.controller: Controller = controller + # Record the tarball isolation directory + self.isolator = controller.path / resource_id + # Record the path of the tarball file self.tarball_path: Path = path + # Record the path of the companion MD5 file + self.md5_path: Path = path.with_suffix(".xz.md5") + # Record where cached unpacked data would live self.cache: Path = controller.cache / self.resource_id @@ -326,9 +333,6 @@ def __init__(self, path: Path, controller: "Controller"): # inactive. self.unpacked: Optional[Path] = None - # Record the path of the companion MD5 file - self.md5_path: Path = path.with_suffix(".xz.md5") - # Record the name of the containing controller self.controller_name: str = controller.name @@ -364,11 +368,21 @@ def check_unpacked(self): # unpacked directory tree. @classmethod - def create(cls, tarball: Path, controller: "Controller") -> "Tarball": + def create( + cls, tarball: Path, resource_id: str, controller: "Controller" + ) -> "Tarball": """An alternate constructor to import a tarball This moves a new tarball into the proper place along with the md5 companion file. It returns the new Tarball object. + + Args: + tarball: location of the tarball + resource_id: tarball resource ID + controller: associated controller object + + Returns: + Tarball object """ # Validate the tarball suffix and extract the dataset name @@ -378,8 +392,23 @@ def create(cls, tarball: Path, controller: "Controller") -> "Tarball": # standard .tar.xz md5_source = tarball.with_suffix(".xz.md5") - destination = controller.path / tarball.name - md5_destination = controller.path / md5_source.name + # It's possible that two similar benchmark runs at about the same + # time can result in identical filenames with distinct MD5 values + # (for example, the same basic benchmark run on two hosts, which + # has been observed in automated cloud testing). To avoid problems, we + # "isolate" each tarball and its MD5 companion in a subdirectory with + # the md5 (resource_id) string to prevent collisions. The Tarball + # object maintains this, but we need it here, first, to move the + # files. + isolator = controller.path / resource_id + + # NOTE: we enable "parents" and "exist_ok" not because we expect these + # conditions (both should be impossible) but because it's not worth an + # extra error check. We'll fail below if either *file* already + # exists in the isolator directory. + isolator.mkdir(parents=True, exist_ok=True) + destination = isolator / tarball.name + md5_destination = isolator / md5_source.name # If either expected destination file exists, something is wrong if destination.exists() or md5_destination.exists(): @@ -436,7 +465,7 @@ def create(cls, tarball: Path, controller: "Controller") -> "Tarball": except Exception as e: controller.logger.error("Error removing staged MD5 {}: {}", name, e) - return cls(destination, controller) + return cls(destination, resource_id, controller) def cache_map(self, dir_path: Path): """Builds Hierarchy structure of a Directory in a Dictionary @@ -784,20 +813,31 @@ def delete(self): files. There's nothing more we can do. """ self.uncache() - if self.md5_path: - try: - self.md5_path.unlink() - except Exception as e: - self.logger.error("archive unlink for {} failed with {}", self.name, e) - self.md5_path = None - if self.tarball_path: + if self.isolator and self.isolator.exists(): try: - self.tarball_path.unlink() + shutil.rmtree(self.isolator) except Exception as e: - self.logger.error( - "archive MD5 unlink for {} failed with {}", self.name, e - ) + self.logger.error("isolator delete for {} failed with {}", self.name, e) + self.isolator = None self.tarball_path = None + self.md5_path = None + else: + if self.md5_path: + try: + self.md5_path.unlink() + except Exception as e: + self.logger.error( + "archive unlink for {} failed with {}", self.name, e + ) + self.md5_path = None + if self.tarball_path: + try: + self.tarball_path.unlink() + except Exception as e: + self.logger.error( + "archive MD5 unlink for {} failed with {}", self.name, e + ) + self.tarball_path = None class Controller: @@ -808,21 +848,6 @@ class Controller: but the audit report generator will flag it. """ - @staticmethod - def delete_if_empty(directory: Path) -> None: - """Delete a directory only if it exists and is empty. - - NOTE: rmdir technically will fail if the directory isn't empty, but - this feels safer. - - Any exceptions raised will be propagated. - - Args: - directory: Directory path - """ - if directory.exists() and not any(directory.iterdir()): - directory.rmdir() - def __init__(self, path: Path, cache: Path, logger: Logger): """Manage the representation of a controller archive on disk. @@ -857,20 +882,37 @@ def __init__(self, path: Path, cache: Path, logger: Logger): # constructor! self._discover_tarballs() + def _add_if_tarball(self, file: Path, md5: Optional[str] = None): + """Check for a tar file, and create an object + + Args: + file: path of potential tarball + md5: known MD5 hash, or None to compute here + """ + if file.is_file() and Dataset.is_tarball(file): + hash = md5 if md5 else get_tarball_md5(file) + tarball = Tarball(file, hash, self) + self.tarballs[tarball.name] = tarball + self.datasets[tarball.resource_id] = tarball + tarball.check_unpacked() + def _discover_tarballs(self): """Discover the known tarballs Look in the ARCHIVE tree's controller directory for tarballs, and add - them to the known set. We also check for unpacked directories in the - CACHE tree matching the resource_id of any tarballs we find in order - to link them. + them to the known set. "Old" tarballs may be at the top level, "new" + tarballs are in "resource_id" isolation directories. + + We also check for unpacked directories in the CACHE tree matching the + resource_id of any tarballs we find in order to link them. """ for file in self.path.iterdir(): - if file.is_file() and Dataset.is_tarball(file): - tarball = Tarball(file, self) - self.tarballs[tarball.name] = tarball - self.datasets[tarball.resource_id] = tarball - tarball.check_unpacked() + if file.is_dir(): + md5 = file.name + for tar in file.iterdir(): + self._add_if_tarball(tar, md5) + else: + self._add_if_tarball(file) @classmethod def create( @@ -896,7 +938,7 @@ def create_tarball(self, tarfile_path: Path) -> Tarball: Returns: Tarball object """ - tarball = Tarball.create(tarfile_path, self) + tarball = Tarball.create(tarfile_path, get_tarball_md5(tarfile_path), self) self.datasets[tarball.resource_id] = tarball self.tarballs[tarball.name] = tarball return tarball @@ -1023,7 +1065,7 @@ def __init__(self, options: PbenchServerConfig, logger: Logger): # resource_id. self.datasets: dict[str, Tarball] = {} - def full_discovery(self): + def full_discovery(self) -> "CacheManager": """Discover the ARCHIVE and CACHE trees NOTE: both _discover_unpacked() and _discover_results() rely on the @@ -1033,6 +1075,7 @@ def full_discovery(self): specific dataset. """ self._discover_controllers() + return self def __contains__(self, dataset_id: str) -> bool: """Determine whether the cache manager includes a dataset. @@ -1066,7 +1109,12 @@ def _clean_empties(self, controller: str): controller: Name of the controller to clean up """ archive = self.options.ARCHIVE / controller - if archive.exists() and not any(archive.glob(f"*{Dataset.TARBALL_SUFFIX}")): + + # Look for tarballs in the controller or subdirectory. (NOTE: this + # provides compatibility with tarballs in an "isolator" subdirectory + # and older tarballs without isolators by using the recursive "**/" + # glob syntax.) + if archive.exists() and not any(archive.glob(f"**/*{Dataset.TARBALL_SUFFIX}")): self.delete_if_empty(archive) del self.controllers[controller] @@ -1125,7 +1173,7 @@ def find_dataset(self, dataset_id: str) -> Tarball: # and (if found) discover the controller containing that dataset. for dir_entry in self.archive_root.iterdir(): if dir_entry.is_dir() and dir_entry.name != self.TEMPORARY: - for file in dir_entry.glob(f"*{Dataset.TARBALL_SUFFIX}"): + for file in dir_entry.glob(f"**/*{Dataset.TARBALL_SUFFIX}"): md5 = get_tarball_md5(file) if md5 == dataset_id: self._add_controller(dir_entry) @@ -1254,7 +1302,6 @@ def uncache(self, dataset_id: str): tarball = self.find_dataset(dataset_id) controller = tarball.controller controller.uncache(dataset_id) - self._clean_empties(controller.name) def delete(self, dataset_id: str): """Delete the dataset as well as unpacked artifacts. @@ -1264,10 +1311,11 @@ def delete(self, dataset_id: str): """ try: tarball = self.find_dataset(dataset_id) - name = tarball.name - tarball.controller.delete(dataset_id) - del self.datasets[dataset_id] - del self.tarballs[name] - self._clean_empties(tarball.controller_name) except TarballNotFound: return + name = tarball.name + tarball.controller.delete(dataset_id) + del self.datasets[dataset_id] + del self.tarballs[name] + + self._clean_empties(tarball.controller_name) diff --git a/lib/pbench/server/indexer.py b/lib/pbench/server/indexer.py index 65d493720c..bfd0d145dd 100644 --- a/lib/pbench/server/indexer.py +++ b/lib/pbench/server/indexer.py @@ -33,6 +33,7 @@ UnsupportedTarballFormat, ) import pbench.server +from pbench.server.cache_manager import Tarball from pbench.server.database.models.datasets import Dataset from pbench.server.database.models.index_map import IndexMapType from pbench.server.templates import PbenchTemplates @@ -3190,23 +3191,22 @@ def __init__( self, idxctx: "IdxContext", dataset: Dataset, - tbarg: str, tmpdir: str, - extracted_root: str, + tarobj: Tarball, ): """Context for indexing a tarball. Args: idxctx: The controlling Elasticsearch (pyesbulk) indexing context dataset: The Dataset object representing the tarball - tbarg: The filesystem path to the tarball (as a string) tmpdir: The path to a temporary directory (as a string) - extracted_root: The path to the extracted tarball data (as a string) + tarobj: The cache manager tarball object """ self.idxctx = idxctx self.authorization = {"owner": str(dataset.owner_id), "access": dataset.access} - self.tbname = tbarg - self.controller_dir = os.path.basename(os.path.dirname(self.tbname)) + self.tarobj = tarobj + self.tbname = str(tarobj.tarball_path) + self.controller_dir = tarobj.controller.name try: self.satellite, self.controller_name = self.controller_dir.split("::", 1) except Exception: @@ -3265,7 +3265,7 @@ def __init__( '{} - tar ball is missing "{}".'.format(self.tbname, metadata_log_path) ) - self.extracted_root = extracted_root + self.extracted_root = tarobj.cache if not os.path.isdir(os.path.join(self.extracted_root, self.dirname)): raise UnsupportedTarballFormat( '{} - extracted tar ball directory "{}" does not' @@ -3273,10 +3273,10 @@ def __init__( self.tbname, os.path.join(self.extracted_root, self.dirname) ) ) - # Open the MD5 file of the tar ball and read the MD5 sum from it. - md5sum = open("%s.md5" % (self.tbname)).read().split()[0] + # Construct the @metadata and run metadata dictionaries from the # metadata.log file. + md5sum = dataset.resource_id self.mdconf = MetadataLog() mdf = os.path.join(self.extracted_root, metadata_log_path) try: @@ -3429,7 +3429,7 @@ def __init__( # using the controller directory, tar ball name (not path), and its # MD5 value so that warnings, errors, and exceptions can have # additional context to add. - self._tbctx = f"{self.controller_dir}/{os.path.basename(tbarg)}({md5sum})" + self._tbctx = f"{self.controller_dir}/{tarobj.name}({md5sum})" def map_document(self, root_idx: str, index: str, id: str) -> None: """ diff --git a/lib/pbench/server/indexing_tarballs.py b/lib/pbench/server/indexing_tarballs.py index 3a4e80d572..7d749b15ef 100644 --- a/lib/pbench/server/indexing_tarballs.py +++ b/lib/pbench/server/indexing_tarballs.py @@ -364,8 +364,6 @@ def sighup_handler(*args): tarobj: Optional[Tarball] = None tb_res = error_code["OK"] try: - path = os.path.realpath(tb) - # Dynamically unpack the tarball for indexing. try: tarobj = self.cache_manager.unpack(dataset.resource_id) @@ -374,7 +372,6 @@ def sighup_handler(*args): "{} has not been unpacked", dataset ) continue - unpacked = tarobj.cache # The indexer needs the root except TarballNotFound as e: self.sync.error( dataset, @@ -392,7 +389,7 @@ def sighup_handler(*args): # "Open" the tar ball represented by the tar ball object idxctx.logger.debug("open tar ball") - ptb = PbenchTarBall(idxctx, dataset, path, tmpdir, unpacked) + ptb = PbenchTarBall(idxctx, dataset, tmpdir, tarobj) # Construct the generator for emitting all actions. # The `idxctx` dictionary is passed along to each diff --git a/lib/pbench/test/functional/server/tarballs/special/fio_rw_2018.02.01T22.40.57.tar.xz b/lib/pbench/test/functional/server/tarballs/special/fio_rw_2018.02.01T22.40.57.tar.xz new file mode 100644 index 0000000000..afcc4018ff Binary files /dev/null and b/lib/pbench/test/functional/server/tarballs/special/fio_rw_2018.02.01T22.40.57.tar.xz differ diff --git a/lib/pbench/test/functional/server/tarballs/special/fio_rw_2018.02.01T22.40.57.tar.xz.md5 b/lib/pbench/test/functional/server/tarballs/special/fio_rw_2018.02.01T22.40.57.tar.xz.md5 new file mode 100644 index 0000000000..eeaf1925c1 --- /dev/null +++ b/lib/pbench/test/functional/server/tarballs/special/fio_rw_2018.02.01T22.40.57.tar.xz.md5 @@ -0,0 +1 @@ +08516cc7448035be2cc502f0517783fa fio_rw_2018.02.01T22.40.57.tar.xz diff --git a/lib/pbench/test/functional/server/test_datasets.py b/lib/pbench/test/functional/server/test_datasets.py index 1f0c769f0a..52ca1c88bb 100644 --- a/lib/pbench/test/functional/server/test_datasets.py +++ b/lib/pbench/test/functional/server/test_datasets.py @@ -17,6 +17,8 @@ TARBALL_DIR = Path("lib/pbench/test/functional/server/tarballs") SPECIAL_DIR = TARBALL_DIR / "special" +NOMETADATA = SPECIAL_DIR / "nometadata.tar.xz" +DUPLICATE_NAME = SPECIAL_DIR / "fio_rw_2018.02.01T22.40.57.tar.xz" SHORT_EXPIRATION_DAYS = 10 @@ -125,7 +127,14 @@ def test_upload_all(self, server_client: PbenchServerClient, login_user): if dataset.name not in expected: continue t = tarballs[dataset.name] - assert dataset.name in dataset.metadata["server.tarball-path"] + + # Check that the tarball path has the expected file name and + # that it's in the expected isolation directory. + path = Path(dataset.metadata["server.tarball-path"]) + assert path.name == f"{dataset.name}.tar.xz" + assert path.parent.name == dataset.resource_id + + # Check that the upload was successful assert dataset.metadata["dataset.operations"]["UPLOAD"]["state"] == "OK" assert ( dataset.metadata["dataset.metalog.pbench.script"] @@ -240,7 +249,7 @@ def test_no_metadata(server_client: PbenchServerClient, login_user): Try to upload a new tarball with no `metadata.log` file, and validate that it doesn't get enabled for unpacking or indexing. """ - tarball = SPECIAL_DIR / "nometadata.tar.xz" + tarball = NOMETADATA name = Dataset.stem(tarball) md5 = Dataset.md5(tarball) response = server_client.upload(tarball) @@ -276,14 +285,54 @@ def test_no_metadata(server_client: PbenchServerClient, login_user): } assert metadata["server.benchmark"] == "unknown" assert metadata["server.archiveonly"] is True - - # NOTE: we could wait here; however, the UNPACK operation is only - # enabled by upload, and INDEX is only enabled by UNPACK: so if they're - # not here immediately after upload, they'll never be here. operations = metadata["dataset.operations"] assert operations["UPLOAD"]["state"] == "OK" assert "INDEX" not in operations + @staticmethod + def test_duplicate_name(server_client: PbenchServerClient, login_user): + """Test handling for a tarball with a duplicate name but unique MD5. + + In automated cloud testing we frequently see datasets started with + similar parameters at about the same time: these can result in + datasets with the same filename but distinct MD5 hashes (e.g., run + on different systems and/or at slightly different times that round to + the same second). Make sure that the server handles this gracefully. + """ + tarball = DUPLICATE_NAME + name = Dataset.stem(tarball) + md5 = Dataset.md5(tarball) + response = server_client.upload(tarball) + assert ( + response.status_code == HTTPStatus.CREATED + ), f"upload {name} returned unexpected status {response.status_code}, {response.text}" + + assert response.json() == { + "message": "File successfully uploaded", + "name": name, + "resource_id": md5, + "notes": [ + "Identified benchmark workload 'fio'.", + f"Expected expiration date is {expiration()}.", + ], + } + assert response.headers["location"] == server_client._uri( + API.DATASETS_INVENTORY, {"dataset": md5, "target": ""} + ) + metadata = server_client.get_metadata( + md5, + [ + "dataset.operations", + "server.archiveonly", + "server.benchmark", + ], + ) + assert metadata["server.benchmark"] == "fio" + assert not metadata["server.archiveonly"] + operations = metadata["dataset.operations"] + assert operations["UPLOAD"]["state"] == "OK" + assert "INDEX" in operations + @staticmethod def check_indexed(server_client: PbenchServerClient, datasets): indexed = [] @@ -321,10 +370,13 @@ def check_indexed(server_client: PbenchServerClient, datasets): @pytest.mark.dependency(name="index", depends=["upload"], scope="session") def test_index_all(self, server_client: PbenchServerClient, login_user): - """Wait for datasets to reach the "Indexed" state, and ensure that the - state and metadata look good. + """Wait for indexable datasets to reach the "Indexed" state, and ensure + that the state and metadata look good. """ - tarball_names = frozenset(t.name for t in TARBALL_DIR.glob("*.tar.xz")) + + # We can't use a set (or dict) here as we have duplicate tarball names + # and we need to retain the original count of matches. + tarball_names = [i.name for i in all_tarballs()] print(" ... reporting dataset status ...") # Test get_list pagination: to avoid forcing a list, we'll count the @@ -347,8 +399,13 @@ def test_index_all(self, server_client: PbenchServerClient, login_user): not_indexed = [] try: for dataset in not_indexed_raw: - tp = dataset.metadata["server.tarball-path"] - if os.path.basename(tp) not in tarball_names: + tp = Path(dataset.metadata["server.tarball-path"]) + + # Ignore unexpected and non-indexable datasets + if ( + tp.name not in tarball_names + or dataset.metadata["server.archiveonly"] + ): continue not_indexed.append(dataset) if dataset.metadata["user.pbench.access"] == "public": @@ -384,9 +441,6 @@ def test_index_all(self, server_client: PbenchServerClient, login_user): print(f"[{(now - start).total_seconds():0.2f}] Checking ...") not_indexed, indexed = TestPut.check_indexed(server_client, not_indexed) for dataset in indexed: - tp = dataset.metadata["server.tarball-path"] - if os.path.basename(tp) not in tarball_names: - continue count += 1 print(f"\t... indexed {dataset.name}") now = datetime.utcnow() @@ -399,9 +453,6 @@ def test_index_all(self, server_client: PbenchServerClient, login_user): f"Timed out after {(now - start).total_seconds()} seconds; unindexed datasets: " + ", ".join(d.name for d in not_indexed) ) - assert ( - len(tarball_names) == count - ), f"Didn't find all expected datasets, found {count} of {len(tarball_names)}" class TestIndexing: @@ -826,21 +877,23 @@ def test_delete_all(self, server_client: PbenchServerClient, login_user): print(" ... reporting behaviors ...") - datasets = server_client.get_list() - datasets_hash = {} try: - for dataset in datasets: - datasets_hash[f"{dataset.name}.tar.xz"] = dataset.resource_id + datasets = server_client.get_list() except HTTPError as exc: pytest.fail( f"Unexpected HTTP error, url = {exc.response.url}, status code" f" = {exc.response.status_code}, text = {exc.response.text!r}" ) - for t in all_tarballs(): - resource_id = datasets_hash.get(t.name) - assert resource_id, f"Expected to find tar ball {t.name} to delete" - response = server_client.remove(resource_id) - assert ( - response.ok - ), f"delete failed with {response.status_code}, {response.text}" - print(f"\t ... deleted {t.name}") + + # Create a set of all tarball names we expect. We'll delete any dataset + # returned by the "get_list" query which is on this list. (NOTE that + # the "set" collapses duplicate names; but it doesn't matter here as + # they'll both match.) + all = set(Dataset.stem(d) for d in all_tarballs()) + for d in datasets: + if d.name in all: + response = server_client.remove(d.resource_id) + assert ( + response.ok + ), f"delete {d.name} failed with {response.status_code}, {response.text}" + print(f"\t ... deleted {d.name}") diff --git a/lib/pbench/test/unit/server/conftest.py b/lib/pbench/test/unit/server/conftest.py index 9716934d14..ac61e8c88d 100644 --- a/lib/pbench/test/unit/server/conftest.py +++ b/lib/pbench/test/unit/server/conftest.py @@ -979,6 +979,36 @@ def current_user(self) -> User: yield create_admin_user +def make_tarball(tarball: Path, date: str) -> tuple[Path, str]: + """Helper to create a test tarball + + Creates a companion MD5 file in the same directory, returning the filename + and the MD5 hash for convenience. + + Args: + tarball: The path of the new tarball. + date: The "creation date" of the tarball metadata + + Returns: + Path and value of the MD5 hash, as a tuple + """ + md5file = tarball.with_suffix(".xz.md5") + metadata = MetadataLog() + metadata.add_section("pbench") + metadata.set("pbench", "date", date) + metadata_file = tarball.parent / "metadata.log" + tarball.parent.mkdir(parents=True, exist_ok=True) + with metadata_file.open("w") as meta_fp: + metadata.write(meta_fp) + with tarfile.open(tarball, "w:xz") as tar: + tar.add(str(metadata_file), arcname=f"{Dataset.stem(tarball)}/metadata.log") + metadata_file.unlink() + md5 = hashlib.md5() + md5.update(tarball.read_bytes()) + md5file.write_text(f"{md5.hexdigest()} {md5file.name}\n") + return md5file, md5.hexdigest() + + @pytest.fixture() def tarball(tmp_path): """Create a test tarball and MD5 file. @@ -991,20 +1021,9 @@ def tarball(tmp_path): """ filename = "pbench-user-benchmark_some + config_2021.05.01T12.42.42.tar.xz" datafile = tmp_path / filename - metadata = MetadataLog() - metadata.add_section("pbench") - metadata.set("pbench", "date", "2002-05-16") - metadata_file = tmp_path / "metadata.log" - with metadata_file.open("w") as meta_fp: - metadata.write(meta_fp) - with tarfile.open(datafile, "w:xz") as tar: - tar.add(str(metadata_file), arcname=f"{Dataset.stem(filename)}/metadata.log") - md5 = hashlib.md5() - md5.update(datafile.read_bytes()) - md5file = tmp_path / (filename + ".md5") - md5file.write_text(md5.hexdigest()) + md5file, md5hash = make_tarball(datafile, "2002-05-16") - yield datafile, md5file, md5.hexdigest() + yield datafile, md5file, md5hash # Clean up after the test case diff --git a/lib/pbench/test/unit/server/test_cache_manager.py b/lib/pbench/test/unit/server/test_cache_manager.py index b108f99521..f432b6118d 100644 --- a/lib/pbench/test/unit/server/test_cache_manager.py +++ b/lib/pbench/test/unit/server/test_cache_manager.py @@ -28,6 +28,7 @@ TarballUnpackError, ) from pbench.server.database.models.datasets import Dataset, DatasetBadName +from pbench.test.unit.server.conftest import make_tarball @pytest.fixture(scope="function", autouse=True) @@ -237,8 +238,8 @@ def test_duplicate( self, monkeypatch, selinux_disabled, server_config, make_logger, tarball ): """ - Test behavior when we try to create a new dataset but the tarball file - name already exists + Test behavior when we create a new dataset with a tarball file name + and MD5 that already exists """ source_tarball, source_md5, md5 = tarball @@ -247,8 +248,8 @@ def test_duplicate( # Create a tarball file in the expected destination directory: the # subsequent create should report a duplicate. - controller = cm.archive_root / "ABC" - controller.mkdir() + controller = cm.archive_root / "ABC" / md5 + controller.mkdir(parents=True) (controller / source_tarball.name).write_text("Send in the clones") with pytest.raises(DuplicateTarball) as exc: cm.create(source_tarball) @@ -501,10 +502,11 @@ def generate_test_result_tree(tmp_path: Path, dir_name: str) -> Path: return sub_dir class MockTarball: - def __init__(self, path: Path, controller: Controller): + def __init__(self, path: Path, resource_id: str, controller: Controller): self.name = Dataset.stem(path) self.tarball_path = path self.cache = controller.cache / "ABC" + self.isolator = controller.path / resource_id self.unpacked = None def test_unpack_tar_subprocess_exception(self, make_logger, monkeypatch): @@ -532,7 +534,9 @@ def mock_run(command, _dir_path, exception, dir_p): m.setattr(shutil, "rmtree", mock_rmtree) m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) - tb = Tarball(tar, Controller(Path("/mock/archive"), cache, make_logger)) + tb = Tarball( + tar, "ABC", Controller(Path("/mock/archive"), cache, make_logger) + ) with pytest.raises(TarballUnpackError) as exc: tb.unpack() msg = f"An error occurred while unpacking {tar}: Command 'tar' timed out after 43 seconds" @@ -567,7 +571,9 @@ def mock_run(command, _dir_path, exception, dir_p): m.setattr(shutil, "rmtree", mock_rmtree) m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) - tb = Tarball(tar, Controller(Path("/mock/archive"), cache, make_logger)) + tb = Tarball( + tar, "ABC", Controller(Path("/mock/archive"), cache, make_logger) + ) with pytest.raises(TarballModeChangeError) as exc: tb.unpack() @@ -603,7 +609,9 @@ def mock_resolve(_path, _strict=False): m.setattr(Path, "resolve", mock_resolve) m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) - tb = Tarball(tar, Controller(Path("/mock/archive"), cache, make_logger)) + tb = Tarball( + tar, "ABC", Controller(Path("/mock/archive"), cache, make_logger) + ) tb.unpack() assert call == ["tar", "find"] assert tb.unpacked == cache / "ABC" / tb.name @@ -616,7 +624,9 @@ def test_cache_map_success(self, make_logger, monkeypatch, tmp_path): with monkeypatch.context() as m: m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) - tb = Tarball(tar, Controller(Path("/mock/archive"), cache, make_logger)) + tb = Tarball( + tar, "ABC", Controller(Path("/mock/archive"), cache, make_logger) + ) tar_dir = TestCacheManager.MockController.generate_test_result_tree( tmp_path, "dir_name" ) @@ -677,7 +687,9 @@ def test_cache_map_bad_dir_path( with monkeypatch.context() as m: m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) - tb = Tarball(tar, Controller(Path("/mock/archive"), cache, make_logger)) + tb = Tarball( + tar, "ABC", Controller(Path("/mock/archive"), cache, make_logger) + ) tar_dir = TestCacheManager.MockController.generate_test_result_tree( tmp_path, "dir_name" ) @@ -856,7 +868,9 @@ def test_cache_map_traverse_cmap( with monkeypatch.context() as m: m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) - tb = Tarball(tar, Controller(Path("/mock/archive"), cache, make_logger)) + tb = Tarball( + tar, "ABC", Controller(Path("/mock/archive"), cache, make_logger) + ) tar_dir = TestCacheManager.MockController.generate_test_result_tree( tmp_path, "dir_name" ) @@ -942,7 +956,9 @@ def test_cache_map_get_info_cmap( with monkeypatch.context() as m: m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) - tb = Tarball(tar, Controller(Path("/mock/archive"), cache, make_logger)) + tb = Tarball( + tar, "ABC", Controller(Path("/mock/archive"), cache, make_logger) + ) tar_dir = TestCacheManager.MockController.generate_test_result_tree( tmp_path, "dir_name" ) @@ -973,7 +989,9 @@ def test_get_inventory( m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) m.setattr(Tarball, "extract", lambda _t, _p: Inventory(exp_stream)) m.setattr(Path, "open", lambda _s, _m="rb": exp_stream) - tb = Tarball(tar, Controller(Path("/mock/archive"), cache, make_logger)) + tb = Tarball( + tar, "ABC", Controller(Path("/mock/archive"), cache, make_logger) + ) tar_dir = TestCacheManager.MockController.generate_test_result_tree( tmp_path, "dir_name" ) @@ -1107,6 +1125,7 @@ def mock_shutil_which( m.setattr(shutil, "which", mock_shutil_which) m.setattr(subprocess, "Popen", MockPopen) m.setattr(Inventory, "close", MockBufferedReader.close) + m.setattr(Tarball, "TAR_EXEC_TIMEOUT", 0.1) try: got = Tarball.extract(tar, path) @@ -1516,12 +1535,12 @@ def test_lifecycle( assert not source_tarball.exists() assert not source_md5.exists() - tarfile = archive / source_tarball.name - md5file = archive / source_md5.name + tarfile = archive / md5 / source_tarball.name + md5file = archive / md5 / source_md5.name assert tarfile.exists() assert md5file.exists() - assert md5 == md5file.read_text() + assert md5 == md5file.read_text().split()[0] md5_hash = hashlib.md5() md5_hash.update(tarfile.read_bytes()) assert md5 == md5_hash.hexdigest() @@ -1576,3 +1595,53 @@ def test_lifecycle( assert not archive.exists() assert not cm.controllers assert not cm.datasets + + def test_compatibility( + self, selinux_enabled, server_config, make_logger, tarball, monkeypatch + ): + """Test compatibility with both new "isolated" and old tarballs + + Make sure we can discover and manage (unpack and delete) both new + tarballs with an MD5 isolation directory and pre-existing tarballs + directly in the controller directory. + """ + + monkeypatch.setattr(Tarball, "_get_metadata", fake_get_metadata) + source_tarball, source_md5, md5 = tarball + cm = CacheManager(server_config, make_logger) + + archive = cm.archive_root / "ABC" + + # Manually create an unisolated "pre-existing" copy of the tarball and + # MD5 file in the controller directory. + + _, id = make_tarball(archive / source_tarball.name, "2023-09-18") + assert id != md5 + cm.create(source_tarball) + + # Rediscover the cache, which should find both tarballs + cm1 = CacheManager(server_config, make_logger).full_discovery() + t1 = cm1[md5] + t2 = cm1[id] + assert t1.name == t2.name == Dataset.stem(source_tarball) + + t1.unpack() + t2.unpack() + + assert t1.unpacked != t2.unpacked + assert (t1.unpacked / "metadata.log").is_file() + assert (t2.unpacked / "metadata.log").is_file() + + tar1 = t1.tarball_path + tar2 = t2.tarball_path + + assert tar1 == tar2.parent / t1.resource_id / tar1.name + + t1.delete() + t2.delete() + + # Check that the tarballs, and the tar1 isolation directory, + # were removed. + assert not tar1.exists() + assert not tar2.exists() + assert not tar1.parent.exists() diff --git a/lib/pbench/test/unit/server/test_indexing_tarballs.py b/lib/pbench/test/unit/server/test_indexing_tarballs.py index 7056f3031a..5ee642591e 100644 --- a/lib/pbench/test/unit/server/test_indexing_tarballs.py +++ b/lib/pbench/test/unit/server/test_indexing_tarballs.py @@ -203,16 +203,15 @@ class FakePbenchTarBall: def __init__( self, idxctx: FakeIdxContext, - username: str, - tbarg: str, + dataset: FakeDataset, tmpdir: str, - extracted_root: str, + tarobj: "FakeTarball", ): self.idxctx = idxctx - self.tbname = tbarg - self.name = Path(tbarg).name - self.username = username - self.extracted_root = extracted_root + self.tbname = tarobj.controller.name + self.name = tarobj.name + self.username = dataset.owner_id + self.extracted_root = tarobj.cache self.index_map = {"root": {"idx1": ["id1", "id2"]}} def mk_tool_data_actions(self) -> JSONARRAY: @@ -275,25 +274,30 @@ def __init__(self, path: Path, cache: Path, logger: Logger): class FakeTarball: - def __init__(self, path: Path, controller: FakeController): + def __init__(self, path: Path, resource_id: str, controller: FakeController): self.name = path.name self.tarball_path = path self.controller = controller self.cache = controller.cache / "ABC" self.unpacked = self.cache / self.name + self.isolator = controller.path / resource_id self.uncache = lambda: None class FakeCacheManager: + lookup = {"ABC": "ds1", "ACDF": "ds2", "GHIJ": "ds3"} + def __init__(self, config: PbenchServerConfig, logger: Logger): self.config = config self.logger = logger self.datasets = {} - def unpack(self, resource_id: str) -> FakeTarball: - controller = FakeController(Path("/archive/ctrl"), Path("/.cache"), self.logger) + def unpack(self, resource_id: str): + controller = FakeController(Path("/archive/ABC"), Path("/.cache"), self.logger) return FakeTarball( - Path(f"/archive/ctrl/tarball-{resource_id}.tar.xz"), controller + Path(f"/archive/ABC/{resource_id}/{self.lookup[resource_id]}.tar.xz"), + resource_id, + controller, )