diff --git a/dissect/target/loader.py b/dissect/target/loader.py index 08d5e29b0..7c93128e1 100644 --- a/dissect/target/loader.py +++ b/dissect/target/loader.py @@ -178,6 +178,7 @@ def open(item: Union[str, Path], *args, **kwargs) -> Loader: register("remote", "RemoteLoader") register("mqtt", "MQTTLoader") register("asdf", "AsdfLoader") +register("acquire", "AcquireLoader") register("tar", "TarLoader") register("vmx", "VmxLoader") register("vmwarevm", "VmwarevmLoader") diff --git a/dissect/target/loaders/acquire.py b/dissect/target/loaders/acquire.py new file mode 100644 index 000000000..04bcb1a47 --- /dev/null +++ b/dissect/target/loaders/acquire.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import logging +import zipfile +from pathlib import Path + +from dissect.target.filesystems.tar import TarFilesystem +from dissect.target.filesystems.zip import ZipFilesystem +from dissect.target.loader import Loader +from dissect.target.loaders.dir import find_and_map_dirs +from dissect.target.target import Target + +log = logging.getLogger(__name__) + +FILESYSTEMS_ROOT = "fs" +FILESYSTEMS_LEGACY_ROOT = "sysvol" + + +def _get_root(path: Path) -> Path | None: + if path.is_file(): + fh = path.open("rb") + if TarFilesystem._detect(fh): + return TarFilesystem(fh).path() + + if ZipFilesystem._detect(fh): + return zipfile.Path(path.open("rb")) + + return None + + +class AcquireLoader(Loader): + def __init__(self, path: Path, **kwargs): + super().__init__(path) + + self.root = _get_root(path) + + @staticmethod + def detect(path: Path) -> bool: + root = _get_root(path) + + if not root: + return False + + return root.joinpath(FILESYSTEMS_ROOT).exists() or root.joinpath(FILESYSTEMS_LEGACY_ROOT).exists() + + def map(self, target: Target) -> None: + # Handle both root dir 'fs' and 'sysvol' (legacy) + fs_root = self.root + if fs_root.joinpath(FILESYSTEMS_ROOT).exists(): + fs_root = fs_root.joinpath(FILESYSTEMS_ROOT) + + find_and_map_dirs(target, fs_root) diff --git a/dissect/target/loaders/dir.py b/dissect/target/loaders/dir.py index ed279f658..ff8621b21 100644 --- a/dissect/target/loaders/dir.py +++ b/dissect/target/loaders/dir.py @@ -1,5 +1,6 @@ from __future__ import annotations +import re import zipfile from collections import defaultdict from pathlib import Path @@ -7,6 +8,7 @@ from dissect.target.filesystem import LayerFilesystem from dissect.target.filesystems.dir import DirectoryFilesystem +from dissect.target.filesystems.tar import TarFilesystem from dissect.target.filesystems.zip import ZipFilesystem from dissect.target.helpers import loaderutil from dissect.target.loader import Loader @@ -16,6 +18,7 @@ from dissect.target import Target PREFIXES = ["", "fs"] +ANON_FS_RE = re.compile(r"^fs[0-9]+$") class DirLoader(Loader): @@ -43,6 +46,7 @@ def map_dirs( *, dirfs: type[DirectoryFilesystem] = DirectoryFilesystem, zipfs: type[ZipFilesystem] = ZipFilesystem, + tarfs: type[TarFilesystem] = TarFilesystem, **kwargs, ) -> None: """Map directories as filesystems into the given target. @@ -53,6 +57,7 @@ def map_dirs( os_type: The operating system type, used to determine how the filesystem should be mounted. dirfs: The filesystem class to use for directory filesystems. zipfs: The filesystem class to use for ZIP filesystems. + tarfs: The filesystem class to use for TAR filesystems. """ alt_separator = "" case_sensitive = True @@ -70,6 +75,8 @@ def map_dirs( if isinstance(path, zipfile.Path): dfs = zipfs(path.root.fp, path.at, alt_separator=alt_separator, case_sensitive=case_sensitive) + elif hasattr(path, "_fs") and isinstance(path._fs, TarFilesystem): + dfs = tarfs(path._fs.tar.fileobj, str(path), alt_separator=alt_separator, case_sensitive=case_sensitive) else: dfs = dirfs(path, alt_separator=alt_separator, case_sensitive=case_sensitive) @@ -86,7 +93,10 @@ def map_dirs( vfs = dfs[0] fs_to_add.append(vfs) - target.fs.mount(drive_letter.lower() + ":", vfs) + mount_letter = drive_letter.lower() + if mount_letter != "$fs$": + mount_letter += ":" + target.fs.mount(mount_letter, vfs) else: fs_to_add.extend(dfs) @@ -130,12 +140,21 @@ def find_dirs(path: Path) -> tuple[str, list[Path]]: for p in path.iterdir(): # Look for directories like C or C: if p.is_dir() and (is_drive_letter_path(p) or p.name in ("sysvol", "$rootfs$")): - dirs.append(p) + if p.name == "sysvol": + dirs.append(('c', p)) + else: + dirs.append((p.name[0], p)) if not os_type: os_type = os_type_from_path(p) - if not os_type: + if p.name == "$fs$": + dirs.append(('$fs$', p)) + for anon_fs in p.iterdir(): + if ANON_FS_RE.match(anon_fs.name): + dirs.append(anon_fs) + + if len(dirs) == 0: os_type = os_type_from_path(path) dirs = [path] diff --git a/dissect/target/loaders/kape.py b/dissect/target/loaders/kape.py index 71bc6394c..a8809bdaf 100644 --- a/dissect/target/loaders/kape.py +++ b/dissect/target/loaders/kape.py @@ -26,7 +26,7 @@ class KapeLoader(DirLoader): def detect(path: Path) -> bool: os_type, dirs = find_dirs(path) if os_type == OperatingSystem.WINDOWS: - for dir_path in dirs: + for volume, dir_path in dirs: for path in USNJRNL_PATHS: if dir_path.joinpath(path).exists(): return True diff --git a/dissect/target/loaders/tanium.py b/dissect/target/loaders/tanium.py index 1c59543bd..389ec44dc 100644 --- a/dissect/target/loaders/tanium.py +++ b/dissect/target/loaders/tanium.py @@ -33,7 +33,7 @@ def detect(path: Path) -> bool: file_path = path.joinpath("file") os_type, dirs = find_dirs(file_path) if file_path and os_type == OperatingSystem.WINDOWS: - for path in dirs: + for volume, path in dirs: # Tanium doesn't have the correct filenames for several files, like $J if path.joinpath("$Extend/$UsnJrnl_$J").exists(): return True diff --git a/dissect/target/loaders/tar.py b/dissect/target/loaders/tar.py index 092cd1a31..d06346f52 100644 --- a/dissect/target/loaders/tar.py +++ b/dissect/target/loaders/tar.py @@ -1,7 +1,6 @@ from __future__ import annotations import logging -import re import tarfile from pathlib import Path @@ -16,9 +15,6 @@ log = logging.getLogger(__name__) -ANON_FS_RE = re.compile(r"^fs[0-9]+$") - - class TarLoader(Loader): """Load tar files.""" @@ -50,52 +46,14 @@ def map(self, target: target.Target) -> None: if member.name == ".": continue - if not member.name.startswith(("/fs/", "fs/", "/sysvol/", "sysvol/")): - # Not an acquire tar - if "/" not in volumes: - vol = filesystem.VirtualFilesystem(case_sensitive=True) - vol.tar = self.tar - volumes["/"] = vol - target.filesystems.add(vol) - - volume = volumes["/"] - mname = member.name - else: - if member.name.startswith(("/fs/", "fs/")): - # Current acquire - parts = member.name.replace("fs/", "").split("/") - if parts[0] == "": - parts.pop(0) - else: - # Legacy acquire - parts = member.name.lstrip("/").split("/") - volume_name = parts[0].lower() - - # NOTE: older versions of acquire would write to "sysvol" instead of a driver letter - # Figuring out the sysvol from the drive letters is easier than the drive letter from "sysvol", - # so this was swapped in acquire 3.12. Now we map all volumes to a drive letter and let the - # Windows OS plugin figure out which is the sysvol - # For backwards compatibility we're forced to keep this check, and assume that "c:" is our sysvol - if volume_name == "sysvol": - volume_name = "c:" - - if volume_name == "$fs$": - if len(parts) == 1: - # The fs/$fs$ entry is ignored, only the directories below it are processed. - continue - fs_name = parts[1] - if ANON_FS_RE.match(fs_name): - parts.pop(0) - volume_name = f"{volume_name}/{fs_name}" - - if volume_name not in volumes: - vol = filesystem.VirtualFilesystem(case_sensitive=False) - vol.tar = self.tar - volumes[volume_name] = vol - target.filesystems.add(vol) - - volume = volumes[volume_name] - mname = "/".join(parts[1:]) + if "/" not in volumes: + vol = filesystem.VirtualFilesystem(case_sensitive=True) + vol.tar = self.tar + volumes["/"] = vol + target.filesystems.add(vol) + + volume = volumes["/"] + mname = member.name entry_cls = TarFilesystemDirectoryEntry if member.isdir() else TarFilesystemEntry entry = entry_cls(volume, fsutil.normpath(mname), member) @@ -107,7 +65,6 @@ def map(self, target: target.Target) -> None: vol, usnjrnl_path=[ "$Extend/$Usnjrnl:$J", - "$Extend/$Usnjrnl:J", # Old versions of acquire used $Usnjrnl:J ], ) diff --git a/tests/_data/loaders/tar/test-anon-filesystems.tar b/tests/_data/loaders/acquire/test-anon-filesystems.tar similarity index 100% rename from tests/_data/loaders/tar/test-anon-filesystems.tar rename to tests/_data/loaders/acquire/test-anon-filesystems.tar diff --git a/tests/_data/loaders/tar/test-windows-fs-c-absolute.tar b/tests/_data/loaders/acquire/test-windows-fs-c-absolute.tar similarity index 100% rename from tests/_data/loaders/tar/test-windows-fs-c-absolute.tar rename to tests/_data/loaders/acquire/test-windows-fs-c-absolute.tar diff --git a/tests/_data/loaders/tar/test-windows-fs-c-relative.tar b/tests/_data/loaders/acquire/test-windows-fs-c-relative.tar similarity index 100% rename from tests/_data/loaders/tar/test-windows-fs-c-relative.tar rename to tests/_data/loaders/acquire/test-windows-fs-c-relative.tar diff --git a/tests/_data/loaders/acquire/test-windows-fs-c.zip b/tests/_data/loaders/acquire/test-windows-fs-c.zip new file mode 100644 index 000000000..9e1300062 --- /dev/null +++ b/tests/_data/loaders/acquire/test-windows-fs-c.zip @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:fa791d1d27f0888e1d799ce0e818a1b21f9372bbab99e3ab818d3ee9f2bd3c1e +size 2418 diff --git a/tests/_data/loaders/tar/test-windows-fs-x.tar b/tests/_data/loaders/acquire/test-windows-fs-x.tar similarity index 100% rename from tests/_data/loaders/tar/test-windows-fs-x.tar rename to tests/_data/loaders/acquire/test-windows-fs-x.tar diff --git a/tests/_data/loaders/tar/test-windows-sysvol-absolute.tar b/tests/_data/loaders/acquire/test-windows-sysvol-absolute.tar similarity index 100% rename from tests/_data/loaders/tar/test-windows-sysvol-absolute.tar rename to tests/_data/loaders/acquire/test-windows-sysvol-absolute.tar diff --git a/tests/_data/loaders/tar/test-windows-sysvol-relative.tar b/tests/_data/loaders/acquire/test-windows-sysvol-relative.tar similarity index 100% rename from tests/_data/loaders/tar/test-windows-sysvol-relative.tar rename to tests/_data/loaders/acquire/test-windows-sysvol-relative.tar diff --git a/tests/_data/loaders/tar/uppercase_driveletter.tar b/tests/_data/loaders/acquire/uppercase_driveletter.tar similarity index 100% rename from tests/_data/loaders/tar/uppercase_driveletter.tar rename to tests/_data/loaders/acquire/uppercase_driveletter.tar diff --git a/tests/loaders/test_acquire.py b/tests/loaders/test_acquire.py new file mode 100644 index 000000000..6e6f51a06 --- /dev/null +++ b/tests/loaders/test_acquire.py @@ -0,0 +1,65 @@ +from pathlib import Path + +import pytest + +from dissect.target import Target +from dissect.target.loaders.acquire import AcquireLoader +from dissect.target.plugins.os.windows._os import WindowsPlugin +from tests._utils import absolute_path + + +def test_tar_sensitive_drive_letter(target_bare: Target) -> None: + tar_file = absolute_path("_data/loaders/acquire/uppercase_driveletter.tar") + + loader = AcquireLoader(Path(tar_file)) + assert loader.detect(Path(tar_file)) + loader.map(target_bare) + + # mounts = c: + assert sorted(target_bare.fs.mounts.keys()) == ["c:"] + + # Initialize our own WindowsPlugin to override the detection + target_bare._os_plugin = WindowsPlugin.create(target_bare, target_bare.fs.mounts["c:"]) + target_bare._init_os() + + # sysvol is now added + assert sorted(target_bare.fs.mounts.keys()) == ["c:", "sysvol"] + + # WindowsPlugin sets the case sensitivity to False + assert target_bare.fs.get("C:/test.file").open().read() == b"hello_world" + assert target_bare.fs.get("c:/test.file").open().read() == b"hello_world" + + +@pytest.mark.parametrize( + "archive, expected_drive_letter", + [ + ("_data/loaders/acquire/test-windows-sysvol-absolute.tar", "c:"), # C: due to backwards compatibility + ("_data/loaders/acquire/test-windows-sysvol-relative.tar", "c:"), # C: due to backwards compatibility + ("_data/loaders/acquire/test-windows-fs-c-relative.tar", "c:"), + ("_data/loaders/acquire/test-windows-fs-c-absolute.tar", "c:"), + ("_data/loaders/acquire/test-windows-fs-x.tar", "x:"), + ("_data/loaders/acquire/test-windows-fs-c.zip", "c:"), + ], +) +def test_tar_loader_windows_sysvol_formats(target_default: Target, archive: str, expected_drive_letter: str) -> None: + path = Path(absolute_path(archive)) + assert AcquireLoader.detect(path) + + loader = AcquireLoader(path) + loader.map(target_default) + + assert WindowsPlugin.detect(target_default) + # NOTE: for the sysvol archives, this also tests the backwards compatibility + assert sorted(target_default.fs.mounts.keys()) == [expected_drive_letter] + assert target_default.fs.get(f"{expected_drive_letter}/Windows/System32/foo.txt") + + +def test_tar_anonymous_filesystems(target_default: Target) -> None: + tar_file = Path(absolute_path("_data/loaders/acquire/test-anon-filesystems.tar")) + assert AcquireLoader.detect(tar_file) + + loader = AcquireLoader(tar_file) + loader.map(target_default) + + assert target_default.fs.get("$fs$/fs0/foo").open().read() == b"hello world\n" + assert target_default.fs.get("$fs$/fs1/bar").open().read() == b"hello world\n" diff --git a/tests/loaders/test_tar.py b/tests/loaders/test_tar.py index a591b3c21..a3d6cc0cf 100644 --- a/tests/loaders/test_tar.py +++ b/tests/loaders/test_tar.py @@ -2,12 +2,18 @@ from dissect.target import Target from dissect.target.loaders.tar import TarLoader -from dissect.target.plugins.os.windows._os import WindowsPlugin from tests._utils import absolute_path -def test_tar_loader_compressed_tar_file(target_win: Target) -> None: - archive_path = absolute_path("_data/loaders/tar/test-archive.tar.gz") +@pytest.mark.parametrize( + "archive", + [ + "_data/loaders/tar/test-archive.tar", + "_data/loaders/tar/test-archive.tar.gz", + ], +) +def test_tar_loader_compressed_tar_file(target_win: Target, archive) -> None: + archive_path = absolute_path(archive) loader = TarLoader(archive_path) loader.map(target_win) @@ -20,28 +26,6 @@ def test_tar_loader_compressed_tar_file(target_win: Target) -> None: assert test_file.open().read() == b"test-value\n" -def test_tar_sensitive_drive_letter(target_bare: Target) -> None: - tar_file = absolute_path("_data/loaders/tar/uppercase_driveletter.tar") - - loader = TarLoader(tar_file) - loader.map(target_bare) - - # mounts = / and c: - assert sorted(target_bare.fs.mounts.keys()) == ["/", "c:"] - assert "C:" not in target_bare.fs.mounts.keys() - - # Initialize our own WindowsPlugin to override the detection - target_bare._os_plugin = WindowsPlugin.create(target_bare, target_bare.fs.mounts["c:"]) - target_bare._init_os() - - # sysvol is now added - assert sorted(target_bare.fs.mounts.keys()) == ["/", "c:", "sysvol"] - - # WindowsPlugin sets the case sensitivity to False - assert target_bare.fs.get("C:/test.file").open().read() == b"hello_world" - assert target_bare.fs.get("c:/test.file").open().read() == b"hello_world" - - def test_tar_loader_compressed_tar_file_with_empty_dir(target_unix: Target) -> None: archive_path = absolute_path("_data/loaders/tar/test-archive-empty-folder.tgz") loader = TarLoader(archive_path) @@ -55,37 +39,3 @@ def test_tar_loader_compressed_tar_file_with_empty_dir(target_unix: Target) -> N empty_folder = target_unix.fs.path("test/empty_dir") assert empty_folder.exists() assert empty_folder.is_dir() - - -@pytest.mark.parametrize( - "archive, expected_drive_letter", - [ - ("_data/loaders/tar/test-windows-sysvol-absolute.tar", "c:"), # C: due to backwards compatibility - ("_data/loaders/tar/test-windows-sysvol-relative.tar", "c:"), # C: due to backwards compatibility - ("_data/loaders/tar/test-windows-fs-c-relative.tar", "c:"), - ("_data/loaders/tar/test-windows-fs-c-absolute.tar", "c:"), - ("_data/loaders/tar/test-windows-fs-x.tar", "x:"), - ], -) -def test_tar_loader_windows_sysvol_formats(target_default: Target, archive: str, expected_drive_letter: str) -> None: - loader = TarLoader(absolute_path(archive)) - loader.map(target_default) - - assert WindowsPlugin.detect(target_default) - # NOTE: for the sysvol archives, this also tests the backwards compatibility - assert sorted(target_default.fs.mounts.keys()) == [expected_drive_letter] - - -def test_tar_anonymous_filesystems(target_default: Target) -> None: - tar_file = absolute_path("_data/loaders/tar/test-anon-filesystems.tar") - - loader = TarLoader(tar_file) - loader.map(target_default) - - # mounts = $fs$/fs0, $fs$/fs1 and / - assert len(target_default.fs.mounts) == 3 - assert "$fs$/fs0" in target_default.fs.mounts.keys() - assert "$fs$/fs1" in target_default.fs.mounts.keys() - assert "/" in target_default.fs.mounts.keys() - assert target_default.fs.get("$fs$/fs0/foo").open().read() == b"hello world\n" - assert target_default.fs.get("$fs$/fs1/bar").open().read() == b"hello world\n"