Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add loader and child plugin for VBK files #1012

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions dissect/target/filesystems/vbk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from __future__ import annotations

import stat
from typing import BinaryIO, Iterator

from dissect.archive import vbk

from dissect.target import exceptions
from dissect.target.exceptions import (
NotASymlinkError,
)
from dissect.target.filesystem import (
Filesystem,
FilesystemEntry,
)
from dissect.target.helpers import fsutil


class VbkFilesystem(Filesystem):
"""Filesystem implementation for VBK files."""

__type__ = "vbk"

def __init__(self, fh: BinaryIO, *args, **kwargs):
super().__init__(fh, *args, **kwargs)

self.vbk = vbk.VBK(fh)
self._fs = None

@staticmethod
def _detect(fh: BinaryIO) -> bool:
try:
vbk.VBK(fh)
return True
except vbk.VBKError:
return False

def get(self, path: str, relentry: FilesystemEntry = None) -> FilesystemEntry:
try:
return VbkFilesystemEntry(self, path.__str__(), self.vbk.get(path))
except FileNotFoundError:
raise exceptions.FileNotFoundError(f"File not found: {path}")


class VbkFilesystemEntry(FilesystemEntry):
fs: VbkFilesystem
entry: vbk.MetaItem

def get(self, path: str) -> FilesystemEntry:
return self.fs.get(fsutil.join(self.path, path, alt_separator=self.fs.alt_separator))

def iterdir(self) -> Iterator[str]:
for entry in self.entry.iterdir():
yield entry.name

def scandir(self) -> Iterator[FilesystemEntry]:
for entry in self.entry.iterdir():
path = fsutil.join(self.path, entry.name)
yield VbkFilesystemEntry(self.fs, path, entry)

def open(self) -> None:
return self.entry.open()

def is_dir(self, follow_symlinks: bool = True) -> bool:
return self.entry.is_dir()

def is_file(self, follow_symlinks: bool = True) -> bool:
return self.entry.is_file()

def is_symlink(self) -> bool:
return False

def readlink(self) -> str:
raise NotASymlinkError()

def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result:
return self._resolve(follow_symlinks=follow_symlinks).lstat()

def lstat(self) -> fsutil.stat_result:
mode = stat.S_IFDIR if self.is_dir() else stat.S_IFREG
size = 0 if self.is_dir() else self.entry.size

# ['mode', 'addr', 'dev', 'nlink', 'uid', 'gid', 'size', 'atime', 'mtime', 'ctime']
st_info = [
mode | 0o755,
fsutil.generate_addr(self.path, alt_separator=self.fs.alt_separator),
id(self.fs),
1,
0,
0,
size,
0,
0,
0,
]
return fsutil.stat_result(st_info)
1 change: 1 addition & 0 deletions dissect/target/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ def open(item: Union[str, Path], *args, **kwargs) -> Loader:
register("ova", "OvaLoader")
register("vbox", "VBoxLoader")
register("vb", "VBLoader")
register("vbk", "VbkLoader")
register("xva", "XvaLoader")
register("vma", "VmaLoader")
register("kape", "KapeLoader")
Expand Down
45 changes: 45 additions & 0 deletions dissect/target/loaders/vbk.py
Matthijsy marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import re
from pathlib import Path
from typing import Iterator

from dissect.target import target, Target, container
from dissect.target.filesystems.vbk import VbkFilesystem
from dissect.target.loader import Loader
from dissect.target.loaders.vmx import VmxLoader


class VbkLoader(Loader):
"""Load Veaam Backup (VBK) files."""

def __init__(self, path, **kwargs):
super().__init__(path, **kwargs)
self.path = path

@staticmethod
def detect(path: Path) -> bool:
return path.suffix.lower() == ".vbk"

@staticmethod
def find_all(path: Path, **kwargs) -> Iterator[Path]:
vbkfs = VbkFilesystem(path.open("rb"))
for _, _, files in vbkfs.walk_ext("/"):
for file in files:
is_vmx = file.path.lower().endswith(".vmx")
is_disk = re.match(r'.{8}-.{4}-.{4}-.{4}-.{12}', file.name)

if is_vmx or is_disk:
yield vbkfs.get(file.path)

def map(self, target: target.Target) -> None:
is_vmx = self.path.name.lower().endswith(".vmx")
is_disk = re.match(r'.{8}-.{4}-.{4}-.{4}-.{12}', self.path.name)

if is_vmx:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Schamper I tried to implement this the way you proposed with the find_all. It works for plain disks within the VBK (altough I am not sure if this works correctly when there are multiple disks of the same machine in the VBK). But how to handle the VMX files? I would like not to re-write the logic the VmxLoader already implements again, but just calling the VmxLoader().map() does not seem to work. You got any pointers for me?

# TODO: how to open this vmx
#VmxLoader(self.path).map(target)
pass
if is_disk:
target.disks.add(self.path.open())



30 changes: 30 additions & 0 deletions dissect/target/plugins/child/vbk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import re
from typing import Iterator

from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.helpers.record import ChildTargetRecord
from dissect.target.loaders.vbk import VBKLoader
from dissect.target.plugin import ChildTargetPlugin


class VBKChildTargetPlugin(ChildTargetPlugin):
"""Child target plugin that yields from VBK."""

__type__ = "vbk"

def check_compatible(self) -> None:
if not isinstance(self.target._loader, VBKLoader):
raise UnsupportedPluginError("Not an VBK File")

def list_children(self) -> Iterator[ChildTargetRecord]:
for _, _, files in self.target.fs.walk_ext("/"):
for file in files:
is_vmx = file.path.lower().endswith(".vmx")
is_disk = re.match(r'.{8}-.{4}-.{4}-.{4}-.{12}', file.name)

if is_vmx or is_disk:
yield ChildTargetRecord(
type=self.__type__,
path=file.path,
_target=self.target,
)
3 changes: 3 additions & 0 deletions tests/_data/loaders/vbk/test9.vbk
Git LFS file not shown
21 changes: 21 additions & 0 deletions tests/loaders/test_vbk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from pathlib import Path

from dissect.target.loaders.vbk import VbkLoader
from dissect.target.target import Target
from tests._utils import absolute_path


def test_vbk_loader(target_default: Target):
archive_path = Path(absolute_path("_data/loaders/vbk/test9.vbk"))

loader = VbkLoader(archive_path)
loader.map(target_default)
target_default.apply()

assert len(target_default.filesystems) == 1

test_file = target_default.fs.path(
"/6745a759-2205-4cd2-b172-8ec8f7e60ef8 (78a5467d-87f5-8540-9a84-7569ae2849ad_2d1bb20f-49c1-485d-a689-696693713a5a)/summary.xml")

assert test_file.exists()
assert len(test_file.open().read()) > 0