From da0644cfc266e34581599b03e8a0b854f7c8e67c Mon Sep 17 00:00:00 2001 From: Matthias Huschle Date: Sun, 24 May 2020 20:50:58 +0200 Subject: [PATCH] fix #1 missing out of parent path check on decryption --- build_and_upload.sh | 2 ++ cryp_to_go/interface.py | 44 ++++++++++++++---------------- cryp_to_go/path_handler.py | 54 ++++++++++++++++++++++++++++++++++++ setup.py | 2 +- tests/test_interface.py | 9 ++++-- tests/test_path_handler.py | 56 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 139 insertions(+), 28 deletions(-) create mode 100644 build_and_upload.sh create mode 100644 cryp_to_go/path_handler.py create mode 100644 tests/test_path_handler.py diff --git a/build_and_upload.sh b/build_and_upload.sh new file mode 100644 index 0000000..4b709a3 --- /dev/null +++ b/build_and_upload.sh @@ -0,0 +1,2 @@ +python3 setup.py sdist bdist_wheel +python3 -m twine upload dist/* diff --git a/cryp_to_go/interface.py b/cryp_to_go/interface.py index af67b05..c4534f6 100644 --- a/cryp_to_go/interface.py +++ b/cryp_to_go/interface.py @@ -14,6 +14,7 @@ CryptoHandler, AsymKey, ) +from .path_handler import SubPath T = TypeVar('T') _KEY_DERIVATION_SETUP = 'key_derivation_setup' @@ -121,31 +122,26 @@ def store_files(self, file_list: Sequence[str]) -> Mapping[str, Tuple[int, bytes - elements of `file_list` must be absolute file paths - only paths in or below current working directory are allowed """ - cwd = pathlib.PurePath(os.getcwd()) - file_list = [os.path.join(str(cwd), x) if not os.path.isabs(x) else x for x in file_list] - paths = [pathlib.Path(x) for x in file_list] - if not all(x.exists() for x in paths): + file_list = [SubPath.from_any_path(x, pathlib.Path(os.getcwd())) for x in file_list] + # file_list = [os.path.join(str(cwd), x) if not os.path.isabs(x) else x for x in file_list] + # paths = [pathlib.Path(x) for x in file_list] + if not all(x.relative_path.exists() for x in file_list): raise OSError( - 'missing file: ' + repr([x for x in file_list if not pathlib.Path(x).exists()])) - relative_paths = [pathlib.PurePath(x).relative_to(cwd) for x in file_list] - if any(x.is_dir() for x in paths): + 'missing file: ' + repr([str(x) for x in file_list if not x.relative_path.exists()])) + # relative_paths = [pathlib.PurePath(x).relative_to(cwd) for x in file_list] + if any(x.relative_path.is_dir() for x in file_list): raise OSError( - 'target is not a file: ' + repr([x for x in paths if x.is_dir()])) - if any('..' in x.parts for x in relative_paths): - raise ValueError( - '".." not allowed in target path: ' - + repr([x for x in relative_paths if '..' in x.parts]) - ) + 'target is not a file: ' + repr([str(x) for x in file_list if x.relative_path.is_dir()])) enc_info = {} - for file in relative_paths: + for file in file_list: file_id = self.store_single_file(file) - enc_info[file] = (file_id, self.crypto_handler.signature) + enc_info[str(file)] = (file_id, self.crypto_handler.signature) return enc_info @contextmanager def _reader_encrypt_file( self, - file: str, + file: SubPath, outfile: Union[str, None] = None ) -> Generator[Tuple[SqliteDatabase, Files, BinaryIO], None, None]: """ Context manager for file encryption stream to Chunks table. @@ -153,10 +149,10 @@ def _reader_encrypt_file( with self.sql_handler.open_db() as db: with db.atomic(): file_entry = Files.create( - path=self.crypto_handler.encrypt_snippet(inflate_string(str(file))), + path=self.crypto_handler.encrypt_snippet(inflate_string(file.slashed_string)), encrypted_file_path=outfile, ) - with open(file, 'rb') as stream_in: + with open(str(file), 'rb') as stream_in: with self.crypto_handler.create_signature(): yield db, file_entry, stream_in @@ -229,7 +225,7 @@ def _db_store_enc_stream_in(self, file_entry: Files, stream_in: BinaryIO, db: Sq file_entry.n_chunks = n_chunks file_entry.save() - def store_single_file(self, file: str, outfile: Union[str, None] = None) -> int: + def store_single_file(self, file: SubPath, outfile: Union[str, None] = None) -> int: """ Store encrypted file content in Files and Chunks. Returns created Files.file_id. @@ -289,11 +285,11 @@ def _restore_single_file_physical(self, file: Files, signature: Union[None, byte Returns path. """ - target_path = deflate_string(self.crypto_handler.decrypt_snippet(file.path)) - dirname = os.path.dirname(target_path) - if len(dirname): + target_path = SubPath(deflate_string(self.crypto_handler.decrypt_snippet(file.path))) + dirname = target_path.relative_path.parent + if dirname != '.': os.makedirs(dirname, exist_ok=True) - with open(target_path, 'wb') as f_out: + with open(str(target_path), 'wb') as f_out: with self.crypto_handler.verify_signature(signature): if file.encrypted_file_path: with open(file.encrypted_file_path, 'rb') as f_in: @@ -302,7 +298,7 @@ def _restore_single_file_physical(self, file: Files, signature: Union[None, byte else: for row in self._iter_file_chunks(file): f_out.write(self.crypto_handler.decrypt_chunk(row.content)) - return target_path + return str(target_path) @staticmethod def _iter_file_chunks(file: Files) -> Generator[Chunks, None, None]: diff --git a/cryp_to_go/path_handler.py b/cryp_to_go/path_handler.py new file mode 100644 index 0000000..16fcf9d --- /dev/null +++ b/cryp_to_go/path_handler.py @@ -0,0 +1,54 @@ +""" Keep subpath handling consistent. + +As files should not be written outside of CWD, this is a security issue. +Paths may be provided as absolute or relative, and they may be manipulated +inside the storage files. So the check should happen every time a path is +actually used. +""" +import os +import pathlib +from typing import TypeVar + +AnyPath = TypeVar("AnyPath", str, pathlib.Path) + + +class SubPath: + + def __init__(self, relative_path: AnyPath): + """ Wrapper for pathlib.Path that only allows relative paths without .. elements. """ + self.relative_path = self.to_path(relative_path) + if self.relative_path.is_absolute(): + raise ValueError("only relative paths allowed here! (%s)" % relative_path) + if '..' in self.relative_path.parts: + raise ValueError("'..' not allowed in SubPath! (%s)" % relative_path) + + def __str__(self) -> str: + """ relative string representation. """ + return str(self.relative_path) + + @staticmethod + def to_path(path: AnyPath) -> pathlib.Path: + """ Pure conversion of string or Path to Path. """ + print(path) + if not isinstance(path, pathlib.Path): + return pathlib.Path(path) + return path + + def absolute_path(self, parent: AnyPath) -> pathlib.Path: + """ Transform to absolute. """ + return self.to_path(parent) / self.relative_path + + @classmethod + def from_any_path(cls, path: AnyPath, parent: AnyPath) -> "SubPath": + """ Create from absolute or relative path. """ + abs_path = cls.to_path(os.path.abspath(path)) + abs_parent = cls.to_path(os.path.abspath(parent)) + return cls(abs_path.relative_to(abs_parent)) + + @property + def slashed_string(self) -> str: + """ '/'-separated string representation. + + Intended for platform-independent storage. + """ + return '/'.join(self.relative_path.parts) diff --git a/setup.py b/setup.py index eaed10c..de67ce9 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name='cryp-to-go', - version='0.1.1', + version='0.2.0', packages=find_packages(), install_requires=['pynacl>=1.3.0', 'cryptography>=2.8', 'peewee'], url='https://github.com/matthiashuschle/cryp-to-go', diff --git a/tests/test_interface.py b/tests/test_interface.py index 7b12140..8f5e5bb 100644 --- a/tests/test_interface.py +++ b/tests/test_interface.py @@ -58,6 +58,7 @@ def _touch(): ) return path_private_key, path_public_key + @contextmanager def temporary_file_structure(): try: @@ -181,9 +182,11 @@ def resolve(basename): inst.store_files([resolve('subdir'), resolve('foo100.dat')]) assert 'subdir' in exc_info.value.args[0] assert 'foo100' not in exc_info.value.args[0] - with pytest.raises(ValueError, match='not allowed') as exc_info: - inst.store_files([resolve(os.path.join('subdir', '..', 'subdir', 'foo300.dat')), resolve('foo100.dat')]) - assert '..' in exc_info.value.args[0] + with pytest.raises(ValueError, match='does not start with') as exc_info: + inst.store_files([resolve( + os.path.join('subdir', '..', '..', 'rogue_dir', 'foo300.dat') + ), resolve('foo100.dat')]) + assert 'foo300' in exc_info.value.args[0] assert 'foo100' not in exc_info.value.args[0] # test actual file storage diff --git a/tests/test_path_handler.py b/tests/test_path_handler.py new file mode 100644 index 0000000..04bd95d --- /dev/null +++ b/tests/test_path_handler.py @@ -0,0 +1,56 @@ +import pathlib +import pytest +from cryp_to_go import path_handler + + +def test_init(): + inst = path_handler.SubPath('foo/bar') + assert str(inst.relative_path) == 'foo/bar' + inst = path_handler.SubPath(pathlib.Path('foo/bar')) + assert str(inst.relative_path) == 'foo/bar' + with pytest.raises(ValueError, match="only relative"): + path_handler.SubPath('/foo/bar') + with pytest.raises(ValueError, match="not allowed"): + path_handler.SubPath('foo/../bar') + + +@pytest.mark.parametrize("input_path,target", [ + ('foo/bar', 'foo/bar'), + (pathlib.Path('foo/bar'), 'foo/bar'), + ('/foo/bar', '/foo/bar'), + (pathlib.Path('/foo/bar'), '/foo/bar'), +]) +def test_to_path(input_path, target): + path = path_handler.SubPath.to_path(input_path) + assert isinstance(path, pathlib.Path) + assert str(path) == target + + +def test_str(): + path = path_handler.SubPath('foo/bar') + assert str(path) == 'foo/bar' + + +@pytest.mark.parametrize("path_parent", ['/foo', pathlib.Path('/foo')]) +def test_absolute_path(path_parent): + path_rel = path_handler.SubPath('bar/bar') + path_abs = path_rel.absolute_path(path_parent) + assert isinstance(path_abs, pathlib.Path) + assert str(path_abs) == '/foo/bar/bar' + + +@pytest.mark.parametrize("path", ['foo/bar', pathlib.Path('foo/bar')]) +def test_from_any_path(path): + subpath = path_handler.SubPath(path) + assert isinstance(subpath, path_handler.SubPath) + assert str(subpath) == 'foo/bar' + + +def test_slashed_string(): + subpath = path_handler.SubPath('foo') + assert subpath.slashed_string == 'foo' + # overwrite internal relative path with PurePath in different flavors + subpath.relative_path = pathlib.PurePosixPath('foo/bar') + assert subpath.slashed_string == 'foo/bar' + subpath.relative_path = pathlib.PureWindowsPath(r'foo\bar') + assert subpath.slashed_string == 'foo/bar'