Skip to content

Commit

Permalink
check for tarballs containing sketchy symlinks
Browse files Browse the repository at this point in the history
It's acceptable for a tarball to have a symlink at `a/b/c/foo.txt` that
points to `../../../foo.txt` (see `legal_symlink_dots.tar`), because
that symlink "stays within" the archive. However, it should be illegal
for the same symlink to point to `../../../../foo.txt` (see
`illegal_symlink_dots.tar`), because that symlink "reaches outside" the
archive. Similarly, it should always be illegal for a tarball to hold a
symlink pointing to an absolute path.

Add validation and tests cases for these behaviors.
  • Loading branch information
oconnor663 committed Apr 5, 2023
1 parent 16b837d commit bc8de02
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 7 deletions.
37 changes: 30 additions & 7 deletions peru/resources/plugins/curl/curl_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,17 @@ def plugin_sync(url, sha1):

def extract_tar(archive_path, dest):
with tarfile.open(archive_path) as t:
validate_filenames(info.path for info in t.getmembers())
for info in t.getmembers():
validate_filename(info.path)
if info.issym():
validate_symlink(info.path, info.linkname)
t.extractall(dest)


def extract_zip(archive_path, dest):
with zipfile.ZipFile(archive_path) as z:
validate_filenames(z.namelist())
for name in z.namelist():
validate_filename(name)
z.extractall(dest)
# Set file permissions. Tar does this by default, but with zip we need
# to do it ourselves.
Expand All @@ -153,11 +157,30 @@ def extract_zip(archive_path, dest):
os.chmod(os.path.join(dest, info.filename), 0o755)


def validate_filenames(names):
for name in names:
path = pathlib.PurePosixPath(name)
if path.is_absolute() or '..' in path.parts:
raise EvilArchiveError('Illegal path in archive: ' + name)
def validate_filename(name):
path = pathlib.PurePosixPath(name)
if path.is_absolute() or ".." in path.parts:
raise EvilArchiveError("Illegal path in archive: " + name)


def validate_symlink(name, target):
# We might do this twice but that's fine.
validate_filename(name)

allowed_parent_parts = len(pathlib.PurePosixPath(name).parts) - 1

target_path = pathlib.PurePosixPath(target)
if target_path.is_absolute():
raise EvilArchiveError("Illegal symlink target in archive: " + target)
leading_parent_parts = 0
for part in target_path.parts:
if part != "..":
break
leading_parent_parts += 1
if leading_parent_parts > allowed_parent_parts:
raise EvilArchiveError("Illegal symlink target in archive: " + target)
if ".." in target_path.parts[leading_parent_parts:]:
raise EvilArchiveError("Illegal symlink target in archive: " + target)


class EvilArchiveError(RuntimeError):
Expand Down
Binary file added tests/resources/illegal_symlink_absolute.tar
Binary file not shown.
Binary file added tests/resources/illegal_symlink_dots.tar
Binary file not shown.
Binary file added tests/resources/legal_symlink_dots.tar
Binary file not shown.
15 changes: 15 additions & 0 deletions tests/test_curl_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,21 @@ def test_evil_archives(self):
with self.assertRaises(curl_plugin.EvilArchiveError):
curl_plugin.extract_tar(str(tar_archive), dest)

def test_evil_symlink_archives(self):
"""Even worse than archives containing bad paths, an archive could
contain a *symlink* pointing to a bad path. Then a subsequent entry in
the *same* archive could write through the symlink."""
dest = shared.create_dir()
for case in ["illegal_symlink_dots", "illegal_symlink_absolute"]:
tar_archive = shared.test_resources / (case + ".tar")
with self.assertRaises(curl_plugin.EvilArchiveError):
curl_plugin.extract_tar(str(tar_archive), dest)
# But leading dots should be allowed in symlinks, as long as they don't
# escape the root of the archive.
for case in ["legal_symlink_dots"]:
tar_archive = shared.test_resources / (case + ".tar")
curl_plugin.extract_tar(str(tar_archive), dest)

def test_request_has_user_agent_header(self):
actual = curl_plugin.build_request("http://example.test")
self.assertTrue(actual.has_header("User-agent"))
Expand Down

0 comments on commit bc8de02

Please sign in to comment.