Skip to content

Commit

Permalink
fix #1
Browse files Browse the repository at this point in the history
missing out of parent path check on decryption
  • Loading branch information
matthiashuschle committed May 24, 2020
1 parent dcf2304 commit da0644c
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 28 deletions.
2 changes: 2 additions & 0 deletions build_and_upload.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
python3 setup.py sdist bdist_wheel
python3 -m twine upload dist/*
44 changes: 20 additions & 24 deletions cryp_to_go/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
CryptoHandler,
AsymKey,
)
from .path_handler import SubPath

T = TypeVar('T')
_KEY_DERIVATION_SETUP = 'key_derivation_setup'
Expand Down Expand Up @@ -121,42 +122,37 @@ def store_files(self, file_list: Sequence[str]) -> Mapping[str, Tuple[int, bytes
- elements of `file_list` must be absolute file paths
- only paths in or below current working directory are allowed
"""
cwd = pathlib.PurePath(os.getcwd())
file_list = [os.path.join(str(cwd), x) if not os.path.isabs(x) else x for x in file_list]
paths = [pathlib.Path(x) for x in file_list]
if not all(x.exists() for x in paths):
file_list = [SubPath.from_any_path(x, pathlib.Path(os.getcwd())) for x in file_list]
# file_list = [os.path.join(str(cwd), x) if not os.path.isabs(x) else x for x in file_list]
# paths = [pathlib.Path(x) for x in file_list]
if not all(x.relative_path.exists() for x in file_list):
raise OSError(
'missing file: ' + repr([x for x in file_list if not pathlib.Path(x).exists()]))
relative_paths = [pathlib.PurePath(x).relative_to(cwd) for x in file_list]
if any(x.is_dir() for x in paths):
'missing file: ' + repr([str(x) for x in file_list if not x.relative_path.exists()]))
# relative_paths = [pathlib.PurePath(x).relative_to(cwd) for x in file_list]
if any(x.relative_path.is_dir() for x in file_list):
raise OSError(
'target is not a file: ' + repr([x for x in paths if x.is_dir()]))
if any('..' in x.parts for x in relative_paths):
raise ValueError(
'".." not allowed in target path: '
+ repr([x for x in relative_paths if '..' in x.parts])
)
'target is not a file: ' + repr([str(x) for x in file_list if x.relative_path.is_dir()]))
enc_info = {}
for file in relative_paths:
for file in file_list:
file_id = self.store_single_file(file)
enc_info[file] = (file_id, self.crypto_handler.signature)
enc_info[str(file)] = (file_id, self.crypto_handler.signature)
return enc_info

@contextmanager
def _reader_encrypt_file(
self,
file: str,
file: SubPath,
outfile: Union[str, None] = None
) -> Generator[Tuple[SqliteDatabase, Files, BinaryIO], None, None]:
""" Context manager for file encryption stream to Chunks table.
"""
with self.sql_handler.open_db() as db:
with db.atomic():
file_entry = Files.create(
path=self.crypto_handler.encrypt_snippet(inflate_string(str(file))),
path=self.crypto_handler.encrypt_snippet(inflate_string(file.slashed_string)),
encrypted_file_path=outfile,
)
with open(file, 'rb') as stream_in:
with open(str(file), 'rb') as stream_in:
with self.crypto_handler.create_signature():
yield db, file_entry, stream_in

Expand Down Expand Up @@ -229,7 +225,7 @@ def _db_store_enc_stream_in(self, file_entry: Files, stream_in: BinaryIO, db: Sq
file_entry.n_chunks = n_chunks
file_entry.save()

def store_single_file(self, file: str, outfile: Union[str, None] = None) -> int:
def store_single_file(self, file: SubPath, outfile: Union[str, None] = None) -> int:
""" Store encrypted file content in Files and Chunks.
Returns created Files.file_id.
Expand Down Expand Up @@ -289,11 +285,11 @@ def _restore_single_file_physical(self, file: Files, signature: Union[None, byte
Returns path.
"""
target_path = deflate_string(self.crypto_handler.decrypt_snippet(file.path))
dirname = os.path.dirname(target_path)
if len(dirname):
target_path = SubPath(deflate_string(self.crypto_handler.decrypt_snippet(file.path)))
dirname = target_path.relative_path.parent
if dirname != '.':
os.makedirs(dirname, exist_ok=True)
with open(target_path, 'wb') as f_out:
with open(str(target_path), 'wb') as f_out:
with self.crypto_handler.verify_signature(signature):
if file.encrypted_file_path:
with open(file.encrypted_file_path, 'rb') as f_in:
Expand All @@ -302,7 +298,7 @@ def _restore_single_file_physical(self, file: Files, signature: Union[None, byte
else:
for row in self._iter_file_chunks(file):
f_out.write(self.crypto_handler.decrypt_chunk(row.content))
return target_path
return str(target_path)

@staticmethod
def _iter_file_chunks(file: Files) -> Generator[Chunks, None, None]:
Expand Down
54 changes: 54 additions & 0 deletions cryp_to_go/path_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
""" Keep subpath handling consistent.
As files should not be written outside of CWD, this is a security issue.
Paths may be provided as absolute or relative, and they may be manipulated
inside the storage files. So the check should happen every time a path is
actually used.
"""
import os
import pathlib
from typing import TypeVar

AnyPath = TypeVar("AnyPath", str, pathlib.Path)


class SubPath:

def __init__(self, relative_path: AnyPath):
""" Wrapper for pathlib.Path that only allows relative paths without .. elements. """
self.relative_path = self.to_path(relative_path)
if self.relative_path.is_absolute():
raise ValueError("only relative paths allowed here! (%s)" % relative_path)
if '..' in self.relative_path.parts:
raise ValueError("'..' not allowed in SubPath! (%s)" % relative_path)

def __str__(self) -> str:
""" relative string representation. """
return str(self.relative_path)

@staticmethod
def to_path(path: AnyPath) -> pathlib.Path:
""" Pure conversion of string or Path to Path. """
print(path)
if not isinstance(path, pathlib.Path):
return pathlib.Path(path)
return path

def absolute_path(self, parent: AnyPath) -> pathlib.Path:
""" Transform to absolute. """
return self.to_path(parent) / self.relative_path

@classmethod
def from_any_path(cls, path: AnyPath, parent: AnyPath) -> "SubPath":
""" Create from absolute or relative path. """
abs_path = cls.to_path(os.path.abspath(path))
abs_parent = cls.to_path(os.path.abspath(parent))
return cls(abs_path.relative_to(abs_parent))

@property
def slashed_string(self) -> str:
""" '/'-separated string representation.
Intended for platform-independent storage.
"""
return '/'.join(self.relative_path.parts)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setup(
name='cryp-to-go',
version='0.1.1',
version='0.2.0',
packages=find_packages(),
install_requires=['pynacl>=1.3.0', 'cryptography>=2.8', 'peewee'],
url='https://github.com/matthiashuschle/cryp-to-go',
Expand Down
9 changes: 6 additions & 3 deletions tests/test_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def _touch():
)
return path_private_key, path_public_key


@contextmanager
def temporary_file_structure():
try:
Expand Down Expand Up @@ -181,9 +182,11 @@ def resolve(basename):
inst.store_files([resolve('subdir'), resolve('foo100.dat')])
assert 'subdir' in exc_info.value.args[0]
assert 'foo100' not in exc_info.value.args[0]
with pytest.raises(ValueError, match='not allowed') as exc_info:
inst.store_files([resolve(os.path.join('subdir', '..', 'subdir', 'foo300.dat')), resolve('foo100.dat')])
assert '..' in exc_info.value.args[0]
with pytest.raises(ValueError, match='does not start with') as exc_info:
inst.store_files([resolve(
os.path.join('subdir', '..', '..', 'rogue_dir', 'foo300.dat')
), resolve('foo100.dat')])
assert 'foo300' in exc_info.value.args[0]
assert 'foo100' not in exc_info.value.args[0]

# test actual file storage
Expand Down
56 changes: 56 additions & 0 deletions tests/test_path_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import pathlib
import pytest
from cryp_to_go import path_handler


def test_init():
inst = path_handler.SubPath('foo/bar')
assert str(inst.relative_path) == 'foo/bar'
inst = path_handler.SubPath(pathlib.Path('foo/bar'))
assert str(inst.relative_path) == 'foo/bar'
with pytest.raises(ValueError, match="only relative"):
path_handler.SubPath('/foo/bar')
with pytest.raises(ValueError, match="not allowed"):
path_handler.SubPath('foo/../bar')


@pytest.mark.parametrize("input_path,target", [
('foo/bar', 'foo/bar'),
(pathlib.Path('foo/bar'), 'foo/bar'),
('/foo/bar', '/foo/bar'),
(pathlib.Path('/foo/bar'), '/foo/bar'),
])
def test_to_path(input_path, target):
path = path_handler.SubPath.to_path(input_path)
assert isinstance(path, pathlib.Path)
assert str(path) == target


def test_str():
path = path_handler.SubPath('foo/bar')
assert str(path) == 'foo/bar'


@pytest.mark.parametrize("path_parent", ['/foo', pathlib.Path('/foo')])
def test_absolute_path(path_parent):
path_rel = path_handler.SubPath('bar/bar')
path_abs = path_rel.absolute_path(path_parent)
assert isinstance(path_abs, pathlib.Path)
assert str(path_abs) == '/foo/bar/bar'


@pytest.mark.parametrize("path", ['foo/bar', pathlib.Path('foo/bar')])
def test_from_any_path(path):
subpath = path_handler.SubPath(path)
assert isinstance(subpath, path_handler.SubPath)
assert str(subpath) == 'foo/bar'


def test_slashed_string():
subpath = path_handler.SubPath('foo')
assert subpath.slashed_string == 'foo'
# overwrite internal relative path with PurePath in different flavors
subpath.relative_path = pathlib.PurePosixPath('foo/bar')
assert subpath.slashed_string == 'foo/bar'
subpath.relative_path = pathlib.PureWindowsPath(r'foo\bar')
assert subpath.slashed_string == 'foo/bar'

0 comments on commit da0644c

Please sign in to comment.