diff --git a/.gitignore b/.gitignore index ecf6be3..3f07002 100644 --- a/.gitignore +++ b/.gitignore @@ -68,7 +68,11 @@ target/ # Project specific tests/dst/* +tests/*_dst tests/test_logs/* !tests/**/.keepdir !tests/src_invalid/* !tests/src_valid/* +pdfid.py +# Plugins are pdfid stuff +plugin_* diff --git a/.travis.yml b/.travis.yml index b778bf8..786eeab 100644 --- a/.travis.yml +++ b/.travis.yml @@ -66,8 +66,8 @@ install: - rm fraunhoferlibrary.zip - 7z x -p42 42.zip # Some random samples - - wget http://www.sample-videos.com/audio/mp3/india-national-anthem.mp3 - - wget http://www.sample-videos.com/video/mp4/720/big_buck_bunny_720p_1mb.mp4 + # - wget http://www.sample-videos.com/audio/mp3/india-national-anthem.mp3 + # - wget http://www.sample-videos.com/video/mp4/720/big_buck_bunny_720p_1mb.mp4 - wget http://thewalter.net/stef/software/rtfx/sample.rtf - popd diff --git a/README.md b/README.md index 19eb6d3..0f368ab 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,12 @@ [![Build Status](https://travis-ci.org/CIRCL/PyCIRCLean.svg?branch=master)](https://travis-ci.org/CIRCL/PyCIRCLean) [![codecov.io](https://codecov.io/github/CIRCL/PyCIRCLean/coverage.svg?branch=master)](https://codecov.io/github/CIRCL/PyCIRCLean?branch=master) -[![Coverage Status](https://coveralls.io/repos/github/Rafiot/PyCIRCLean/badge.svg?branch=master)](https://coveralls.io/github/Rafiot/PyCIRCLean?branch=master) # PyCIRCLean PyCIRCLean is the core Python code used by [CIRCLean](https://github.com/CIRCL/Circlean/), an open-source -USB key and document sanitizer created by [CIRCL](https://www.circl.lu/). This module has been separated from the -device-specific scripts and can be used for dedicated security applications to sanitize documents from hostile environments -to trusted environments. PyCIRCLean is currently Python 3.3+ only. +USB key and document sanitizer created by [CIRCL](https://www.circl.lu/). This module has been separated from the +device-specific scripts and can be used for dedicated security applications to sanitize documents from hostile environments +to trusted environments. PyCIRCLean is currently Python 3.3+ compatible. # Installation @@ -23,10 +22,13 @@ pip install . # How to use PyCIRCLean -PyCIRCLean is a simple Python library to handle file checking and sanitization. PyCIRCLean is designed as a simple library -that can be overloaded to cover specific checking and sanitization workflows in different organizations like industrial +PyCIRCLean is a simple Python library to handle file checking and sanitization. +PyCIRCLean is designed to be extended to cover specific checking +and sanitization workflows in different organizations such as industrial environments or restricted/classified ICT environments. A series of practical examples utilizing PyCIRCLean can be found -in the [./examples](./examples) directory. +in the [./examples](./examples) directory. Note: for commits beyond version 2.2.0 these +examples are not guaranteed to work with the PyCIRCLean API. Please check [helpers.py](./kittengroomer/helpers.py) or +[filecheck.py](./bin/filecheck.py) to see the new API interface. The following simple example using PyCIRCLean will only copy files with a .conf extension matching the 'text/plain' MIME type. If any other file is found in the source directory, the files won't be copied to the destination directory. @@ -41,94 +43,79 @@ from kittengroomer import FileBase, KittenGroomerBase, main # Extension -configfiles = {'.conf': 'text/plain'} +class Config: + configfiles = {'.conf': 'text/plain'} class FileSpec(FileBase): def __init__(self, src_path, dst_path): - ''' Init file object, set the extension ''' + """Init file object, set the extension.""" super(FileSpec, self).__init__(src_path, dst_path) + self.valid_files = {} a, self.extension = os.path.splitext(self.src_path) self.mimetype = magic.from_file(self.src_path, mime=True).decode("utf-8") + # The initial version will only accept the file extensions/mimetypes listed here. + self.valid_files.update(Config.configfiles) + + def check(self): + valid = True + expected_mime = self.valid_files.get(self.extension) + if expected_mime is None: + # Unexpected extension => disallowed + valid = False + compare_ext = 'Extension: {} - Expected: {}'.format(self.cur_file.extension, ', '.join(self.valid_files.keys())) + elif self.mimetype != expected_mime: + # Unexpected mimetype => disallowed + valid = False + compare_mime = 'Mime: {} - Expected: {}'.format(self.cur_file.mimetype, expected_mime) + self.add_log_details('valid', valid) + if valid: + self.cur_file.log_string = 'Extension: {} - MimeType: {}'.format(self.cur_file.extension, self.cur_file.mimetype) + else: + self.should_copy = False + if compare_ext is not None: + self.add_log_string(compare_ext) + else: + self.add_log_string(compare_mime) + if self.should_copy: + self.safe_copy() + self.write_log() class KittenGroomerSpec(KittenGroomerBase): def __init__(self, root_src=None, root_dst=None): - ''' - Initialize the basics of the copy - ''' + """Initialize the basics of the copy.""" if root_src is None: root_src = os.path.join(os.sep, 'media', 'src') if root_dst is None: root_dst = os.path.join(os.sep, 'media', 'dst') super(KittenGroomerSpec, self).__init__(root_src, root_dst) - self.valid_files = {} - - # The initial version will only accept the file extensions/mimetypes listed here. - self.valid_files.update(configfiles) - - def _print_log(self): - ''' - Print the logs related to the current file being processed - ''' - tmp_log = self.log_name.fields(**self.cur_file.log_details) - if not self.cur_file.log_details.get('valid'): - tmp_log.warning(self.cur_file.log_string) - else: - tmp_log.debug(self.cur_file.log_string) def processdir(self): - ''' - Main function doing the processing - ''' + """Main function doing the processing.""" to_copy = [] error = [] for srcpath in self._list_all_files(self.src_root_dir): - valid = True - self.log_name.info('Processing {}', srcpath.replace(self.src_root_dir + '/', '')) - self.cur_file = FileSpec(srcpath, srcpath.replace(self.src_root_dir, self.dst_root_dir)) - expected_mime = self.valid_files.get(self.cur_file.extension) - if expected_mime is None: - # Unexpected extension => disallowed - valid = False - compare_ext = 'Extension: {} - Expected: {}'.format(self.cur_file.extension, ', '.join(self.valid_files.keys())) - elif self.cur_file.mimetype != expected_mime: - # Unexpected mimetype => disallowed - valid = False - compare_mime = 'Mime: {} - Expected: {}'.format(self.cur_file.mimetype, expected_mime) - self.cur_file.add_log_details('valid', valid) - if valid: - to_copy.append(self.cur_file) - self.cur_file.log_string = 'Extension: {} - MimeType: {}'.format(self.cur_file.extension, self.cur_file.mimetype) - else: - error.append(self.cur_file) - if compare_ext is not None: - self.cur_file.log_string = compare_ext - else: - self.cur_file.log_string = compare_mime - if len(error) > 0: - for f in error + to_copy: - self.cur_file = f - self._print_log() - else: - for f in to_copy: - self.cur_file = f - self._safe_copy() - self._print_log() + dstpath = srcpath.replace(self.src_root_dir, self.dst_root_dir) + cur_file = FileSpec(srcpath, dstpath) + cur_file.check() if __name__ == '__main__': main(KittenGroomerSpec, ' Only copy some files, returns an error is anything else is found') - exit(0) + ~~~ # How to contribute -We welcome contributions (including bug fixes, new code workflows) via pull requests. We are interested in any new workflows -that can be used to improve security in different organizations. If you see any potential enhancements required to support -your sanitization workflow, please feel free to open an issue. Read [CONTRIBUTING.md](/CONTRIBUTING.md) for more information. +We welcome contributions (including bug fixes, new example file processing +workflows) via pull requests. We are particularly interested in any new workflows +that can be used to improve security in different organizations. If you see any +potential enhancements required to support your sanitization workflow, please feel +free to open an issue. Read [CONTRIBUTING.md](/CONTRIBUTING.md) for more +information. # License diff --git a/bin/filecheck.py b/bin/filecheck.py index 8440bd6..3dc26dd 100644 --- a/bin/filecheck.py +++ b/bin/filecheck.py @@ -5,105 +5,129 @@ import shlex import subprocess import zipfile +import argparse import oletools.oleid import olefile import officedissector - import warnings import exifread from PIL import Image # from PIL import PngImagePlugin - from pdfid import PDFiD, cPDFiD -from kittengroomer import FileBase, KittenGroomerBase, main +from kittengroomer import FileBase, KittenGroomerBase + SEVENZ_PATH = '/usr/bin/7z' -# Prepare application/ -mimes_ooxml = ['vnd.openxmlformats-officedocument.'] -mimes_office = ['msword', 'vnd.ms-'] -mimes_libreoffice = ['vnd.oasis.opendocument'] -mimes_rtf = ['rtf', 'richtext'] -mimes_pdf = ['pdf', 'postscript'] -mimes_xml = ['xml'] -mimes_ms = ['dosexec'] -mimes_compressed = ['zip', 'rar', 'bzip2', 'lzip', 'lzma', 'lzop', - 'xz', 'compress', 'gzip', 'tar'] -mimes_data = ['octet-stream'] - -# Prepare image/ -mimes_exif = ['image/jpeg', 'image/tiff'] -mimes_png = ['image/png'] - -# Mimetypes we can pull metadata from -mimes_metadata = ['image/jpeg', 'image/tiff', 'image/png'] - -# Aliases -aliases = { - # Win executables - 'application/x-msdos-program': 'application/x-dosexec', - 'application/x-dosexec': 'application/x-msdos-program', - # Other apps with confusing mimetypes - 'application/rtf': 'text/rtf', -} - -# Sometimes, mimetypes.guess_type is giving unexpected results, such as for the .tar.gz files: -# In [12]: mimetypes.guess_type('toot.tar.gz', strict=False) -# Out[12]: ('application/x-tar', 'gzip') -# It works as expected if you do mimetypes.guess_type('application/gzip', strict=False) -propertype = {'.gz': 'application/gzip'} - -# Commonly used malicious extensions -# Sources: http://www.howtogeek.com/137270/50-file-extensions-that-are-potentially-dangerous-on-windows/ -# https://github.com/wiregit/wirecode/blob/master/components/core-settings/src/main/java/org/limewire/core/settings/FilterSettings.java -MAL_EXTS = ( - # Applications - ".exe", ".pif", ".application", ".gadget", ".msi", ".msp", ".com", ".scr", - ".hta", ".cpl", ".msc", ".jar", - # Scripts - ".bat", ".cmd", ".vb", ".vbs", ".vbe", ".js", ".jse", ".ws", ".wsf", - ".wsc", ".wsh", ".ps1", ".ps1xml", ".ps2", ".ps2xml", ".psc1", ".psc2", - ".msh", ".msh1", ".msh2", ".mshxml", ".msh1xml", ".msh2xml", - # Shortcuts - ".scf", ".lnk", ".inf", - # Other - ".reg", ".dll", - # Office macro (OOXML with macro enabled) - ".docm", ".dotm", ".xlsm", ".xltm", ".xlam", ".pptm", ".potm", ".ppam", - ".ppsm", ".sldm", - # banned from wirecode - ".asf", ".asx", ".au", ".htm", ".html", ".mht", ".vbs", - ".wax", ".wm", ".wma", ".wmd", ".wmv", ".wmx", ".wmz", ".wvx", -) +class Config: + """Configuration information for Filecheck.""" + + # Application subtypes (mimetype: 'application/') + mimes_ooxml = ['vnd.openxmlformats-officedocument.'] + mimes_office = ['msword', 'vnd.ms-'] + mimes_libreoffice = ['vnd.oasis.opendocument'] + mimes_rtf = ['rtf', 'richtext'] + mimes_pdf = ['pdf', 'postscript'] + mimes_xml = ['xml'] + mimes_ms = ['dosexec'] + mimes_compressed = ['zip', 'rar', 'bzip2', 'lzip', 'lzma', 'lzop', + 'xz', 'compress', 'gzip', 'tar'] + mimes_data = ['octet-stream'] + + # Image subtypes + mimes_exif = ['image/jpeg', 'image/tiff'] + mimes_png = ['image/png'] + + # Mimetypes with metadata + mimes_metadata = ['image/jpeg', 'image/tiff', 'image/png'] + + # Commonly used malicious extensions + # Sources: http://www.howtogeek.com/137270/50-file-extensions-that-are-potentially-dangerous-on-windows/ + # https://github.com/wiregit/wirecode/blob/master/components/core-settings/src/main/java/org/limewire/core/settings/FilterSettings.java + malicious_exts = ( + # Applications + ".exe", ".pif", ".application", ".gadget", ".msi", ".msp", ".com", ".scr", + ".hta", ".cpl", ".msc", ".jar", + # Scripts + ".bat", ".cmd", ".vb", ".vbs", ".vbe", ".js", ".jse", ".ws", ".wsf", + ".wsc", ".wsh", ".ps1", ".ps1xml", ".ps2", ".ps2xml", ".psc1", ".psc2", + ".msh", ".msh1", ".msh2", ".mshxml", ".msh1xml", ".msh2xml", + # Shortcuts + ".scf", ".lnk", ".inf", + # Other + ".reg", ".dll", + # Office macro (OOXML with macro enabled) + ".docm", ".dotm", ".xlsm", ".xltm", ".xlam", ".pptm", ".potm", ".ppam", + ".ppsm", ".sldm", + # banned from wirecode + ".asf", ".asx", ".au", ".htm", ".html", ".mht", ".vbs", + ".wax", ".wm", ".wma", ".wmd", ".wmv", ".wmx", ".wmz", ".wvx", + ) + + # Aliases + aliases = { + # Win executables + 'application/x-msdos-program': 'application/x-dosexec', + 'application/x-dosexec': 'application/x-msdos-program', + # Other apps with confusing mimetypes + 'application/rtf': 'text/rtf', + } + + # Sometimes, mimetypes.guess_type gives unexpected results, such as for .tar.gz files: + # In [12]: mimetypes.guess_type('toot.tar.gz', strict=False) + # Out[12]: ('application/x-tar', 'gzip') + # It works as expected if you do mimetypes.guess_type('application/gzip', strict=False) + override_ext = {'.gz': 'application/gzip'} class File(FileBase): - def __init__(self, src_path, dst_path): - super(File, self).__init__(src_path, dst_path) + def __init__(self, src_path, dst_path, logger): + super(File, self).__init__(src_path, dst_path, logger) self.is_recursive = False - self._check_dangerous() - if self.is_dangerous(): - return - self.log_details.update({'maintype': self.main_type, - 'subtype': self.sub_type, - 'extension': self.extension}) - self._check_extension() - self._check_mime() + subtypes_apps = [ + (Config.mimes_office, self._winoffice), + (Config.mimes_ooxml, self._ooxml), + (Config.mimes_rtf, self.text), + (Config.mimes_libreoffice, self._libreoffice), + (Config.mimes_pdf, self._pdf), + (Config.mimes_xml, self.text), + (Config.mimes_ms, self._executables), + (Config.mimes_compressed, self._archive), + (Config.mimes_data, self._binary_app), + ] + self.app_subtype_methods = self._make_method_dict(subtypes_apps) + + types_metadata = [ + (Config.mimes_exif, self._metadata_exif), + (Config.mimes_png, self._metadata_png), + ] + self.metadata_mimetype_methods = self._make_method_dict(types_metadata) + + self.mime_processing_options = { + 'text': self.text, + 'audio': self.audio, + 'image': self.image, + 'video': self.video, + 'application': self.application, + 'example': self.example, + 'message': self.message, + 'model': self.model, + 'multipart': self.multipart, + 'inode': self.inode, + } def _check_dangerous(self): - if not self.has_mimetype(): - # No mimetype, should not happen. - self.make_dangerous() - if not self.has_extension(): - self.make_dangerous() - if self.extension in MAL_EXTS: - self.log_details.update({'malicious_extension': self.extension}) - self.make_dangerous() + if not self.has_mimetype: + self.make_dangerous('no mimetype') + if not self.has_extension: + self.make_dangerous('no extension') + if self.extension in Config.malicious_exts: + self.make_dangerous('malicious_extension') def _check_extension(self): """Guesses the file's mimetype based on its extension. If the file's @@ -111,474 +135,432 @@ def _check_extension(self): module's list of valid mimetypes and the expected mimetype based on its extension differs from the mimetype determined by libmagic, then it marks the file as dangerous.""" - if propertype.get(self.extension) is not None: - expected_mimetype = propertype.get(self.extension) + if self.extension in Config.override_ext: + expected_mimetype = Config.override_ext[self.extension] else: - expected_mimetype, encoding = mimetypes.guess_type(self.src_path, strict=False) - if aliases.get(expected_mimetype) is not None: - expected_mimetype = aliases.get(expected_mimetype) + expected_mimetype, encoding = mimetypes.guess_type(self.src_path, + strict=False) + if expected_mimetype in Config.aliases: + expected_mimetype = Config.aliases[expected_mimetype] is_known_extension = self.extension in mimetypes.types_map.keys() if is_known_extension and expected_mimetype != self.mimetype: - self.log_details.update({'expected_mimetype': expected_mimetype}) - self.make_dangerous() + # LOG: improve this string + self.make_dangerous('expected_mimetype') - def _check_mime(self): + def _check_mimetype(self): """Takes the mimetype (as determined by libmagic) and determines whether the list of extensions that are normally associated with that extension contains the file's actual extension.""" - if aliases.get(self.mimetype) is not None: - mimetype = aliases.get(self.mimetype) + if self.mimetype in Config.aliases: + mimetype = Config.aliases[self.mimetype] else: mimetype = self.mimetype - expected_extensions = mimetypes.guess_all_extensions(mimetype, strict=False) + expected_extensions = mimetypes.guess_all_extensions(mimetype, + strict=False) if expected_extensions: - if len(self.extension) > 0 and self.extension not in expected_extensions: - self.log_details.update({'expected_extensions': expected_extensions}) - self.make_dangerous() + if self.has_extension and self.extension not in expected_extensions: + # LOG: improve this string + self.make_dangerous('expected extensions') + + def check(self): + self._check_dangerous() + if self.has_extension: + self._check_extension() + if self.has_mimetype: + self._check_mimetype() + if not self.is_dangerous: + self.mime_processing_options.get(self.main_type, self.unknown)() + # ##### Helper functions ##### + def _make_method_dict(self, list_of_tuples): + """Returns a dictionary with mimetype: method pairs.""" + dict_to_return = {} + for list_of_subtypes, method in list_of_tuples: + for subtype in list_of_subtypes: + dict_to_return[subtype] = method + return dict_to_return + + @property def has_metadata(self): - if self.mimetype in mimes_metadata: + """True if filetype typically contains metadata, else False.""" + if self.mimetype in Config.mimes_metadata: return True return False - -class KittenGroomerFileCheck(KittenGroomerBase): - - def __init__(self, root_src=None, root_dst=None, max_recursive_depth=2, debug=False): - if root_src is None: - root_src = os.path.join(os.sep, 'media', 'src') - if root_dst is None: - root_dst = os.path.join(os.sep, 'media', 'dst') - super(KittenGroomerFileCheck, self).__init__(root_src, root_dst, debug) - self.recursive_archive_depth = 0 - self.max_recursive_depth = max_recursive_depth - - subtypes_apps = [ - (mimes_office, self._winoffice), - (mimes_ooxml, self._ooxml), - (mimes_rtf, self.text), - (mimes_libreoffice, self._libreoffice), - (mimes_pdf, self._pdf), - (mimes_xml, self.text), - (mimes_ms, self._executables), - (mimes_compressed, self._archive), - (mimes_data, self._binary_app), - ] - self.subtypes_application = self._init_subtypes_application(subtypes_apps) - - types_metadata = [ - (mimes_exif, self._metadata_exif), - (mimes_png, self._metadata_png), - ] - self.metadata_processing_options = self._init_subtypes_application(types_metadata) - - self.mime_processing_options = { - 'text': self.text, - 'audio': self.audio, - 'image': self.image, - 'video': self.video, - 'application': self.application, - 'example': self.example, - 'message': self.message, - 'model': self.model, - 'multipart': self.multipart, - 'inode': self.inode, - } - - # ##### Helper functions ##### - def _init_subtypes_application(self, subtypes_application): - """Creates a dictionary with the right method based on the sub mime type.""" - subtype_dict = {} - for list_subtypes, func in subtypes_application: - for st in list_subtypes: - subtype_dict[st] = func - return subtype_dict - - def _print_log(self): - """Print the logs related to the current file being processed.""" - # TODO: change name to _write_log - tmp_log = self.log_name.fields(**self.cur_file.log_details) - if self.cur_file.is_dangerous(): - tmp_log.warning(self.cur_file.log_string) - elif self.cur_file.log_details.get('unknown') or self.cur_file.log_details.get('binary'): - tmp_log.info(self.cur_file.log_string) - else: - tmp_log.debug(self.cur_file.log_string) - - def _run_process(self, command_string, timeout=None): - """Run command_string in a subprocess, wait until it finishes.""" - args = shlex.split(command_string) - with open(self.log_debug_err, 'ab') as stderr, open(self.log_debug_out, 'ab') as stdout: - try: - subprocess.check_call(args, stdout=stdout, stderr=stderr, timeout=timeout) - except (subprocess.TimeoutExpired, subprocess.CalledProcessError): - return - return True + def make_tempdir(self): + """Make a temporary directory at self.tempdir_path.""" + self.tempdir_path = self.dst_path + '_temp' + if not os.path.exists(self.tempdir_path): + os.makedirs(self.tempdir_path) + return self.tempdir_path ####################### # ##### Discarded mimetypes, reason in the docstring ###### def inode(self): """Empty file or symlink.""" - if self.cur_file.is_symlink(): - self.cur_file.log_string += 'Symlink to {}'.format(self.cur_file.log_details['symlink']) + if self.is_symlink: + symlink_path = self.get_property('symlink') + self.add_file_string('Symlink to {}'.format(symlink_path)) else: - self.cur_file.log_string += 'Inode file' + self.add_file_string('Inode file') + self.should_copy = False def unknown(self): """Main type should never be unknown.""" - self.cur_file.log_string += 'Unknown file' + self.add_file_string('Unknown file') + self.should_copy = False def example(self): """Used in examples, should never be returned by libmagic.""" - self.cur_file.log_string += 'Example file' + self.add_file_string('Example file') + self.should_copy = False def multipart(self): """Used in web apps, should never be returned by libmagic""" - self.cur_file.log_string += 'Multipart file' + self.add_file_string('Multipart file') + self.should_copy = False # ##### Treated as malicious, no reason to have it on a USB key ###### def message(self): """Process a message file.""" - self.cur_file.log_string += 'Message file' - self.cur_file.make_dangerous() - self._safe_copy() + self.add_file_string('Message file') + self.make_dangerous('Message file') def model(self): """Process a model file.""" - self.cur_file.log_string += 'Model file' - self.cur_file.make_dangerous() - self._safe_copy() + self.add_file_string('Model file') + self.make_dangerous('Model file') # ##### Files that will be converted ###### def text(self): """Process an rtf, ooxml, or plaintext file.""" - for r in mimes_rtf: - if r in self.cur_file.sub_type: - self.cur_file.log_string += 'Rich Text file' + for mt in Config.mimes_rtf: + if mt in self.sub_type: + self.add_file_string('Rich Text file') # TODO: need a way to convert it to plain text - self.cur_file.force_ext('.txt') - self._safe_copy() + self.force_ext('.txt') return - for o in mimes_ooxml: - if o in self.cur_file.sub_type: - self.cur_file.log_string += 'OOXML File' + for mt in Config.mimes_ooxml: + if mt in self.sub_type: + self.add_file_string('OOXML File') self._ooxml() return - self.cur_file.log_string += 'Text file' - self.cur_file.force_ext('.txt') - self._safe_copy() + self.add_file_string('Text file') + self.force_ext('.txt') def application(self): - """Processes an application specific file according to its subtype.""" - for subtype, fct in self.subtypes_application.items(): - if subtype in self.cur_file.sub_type: - fct() - self.cur_file.log_string += 'Application file' + """Process an application specific file according to its subtype.""" + for subtype, method in self.app_subtype_methods.items(): + if subtype in self.sub_type: + # TODO: should we change the logic so we don't iterate through all of the subtype methods? + # TODO: should these methods return a value? + method() + self.add_file_string('Application file') return - self.cur_file.log_string += 'Unknown Application file' + self.add_file_string('Unknown Application file') self._unknown_app() def _executables(self): - """Processes an executable file.""" - self.cur_file.add_log_details('processing_type', 'executable') - self.cur_file.make_dangerous() - self._safe_copy() + """Process an executable file.""" + # LOG: change the processing_type property to some other name or include in file_string + self.set_property('processing_type', 'executable') + self.make_dangerous('executable') def _winoffice(self): - """Processes a winoffice file using olefile/oletools.""" - self.cur_file.add_log_details('processing_type', 'WinOffice') - # Try as if it is a valid document - oid = oletools.oleid.OleID(self.cur_file.src_path) - if not olefile.isOleFile(self.cur_file.src_path): + """Process a winoffice file using olefile/oletools.""" + # LOG: processing_type property + self.set_property('processing_type', 'WinOffice') + oid = oletools.oleid.OleID(self.src_path) # First assume a valid file + if not olefile.isOleFile(self.src_path): # Manual processing, may already count as suspicious try: - ole = olefile.OleFileIO(self.cur_file.src_path, raise_defects=olefile.DEFECT_INCORRECT) + ole = olefile.OleFileIO(self.src_path, raise_defects=olefile.DEFECT_INCORRECT) except: - self.cur_file.add_log_details('not_parsable', True) - self.cur_file.make_dangerous() + self.make_dangerous('not parsable') if ole.parsing_issues: - self.cur_file.add_log_details('parsing_issues', True) - self.cur_file.make_dangerous() + self.make_dangerous('parsing issues') else: if ole.exists('macros/vba') or ole.exists('Macros') \ or ole.exists('_VBA_PROJECT_CUR') or ole.exists('VBA'): - self.cur_file.add_log_details('macro', True) - self.cur_file.make_dangerous() + self.make_dangerous('macro') else: indicators = oid.check() - # Encrypted ban be set by multiple checks on the script + # Encrypted can be set by multiple checks on the script if oid.encrypted.value: - self.cur_file.add_log_details('encrypted', True) - self.cur_file.make_dangerous() + self.make_dangerous('encrypted') if oid.macros.value or oid.ole.exists('macros/vba') or oid.ole.exists('Macros') \ or oid.ole.exists('_VBA_PROJECT_CUR') or oid.ole.exists('VBA'): - self.cur_file.add_log_details('macro', True) - self.cur_file.make_dangerous() + self.make_dangerous('macro') for i in indicators: if i.id == 'ObjectPool' and i.value: - # FIXME: Is it suspicious? - self.cur_file.add_log_details('objpool', True) + # TODO: Is it suspicious? + # LOG: user defined property + self.set_property('objpool', True) elif i.id == 'flash' and i.value: - self.cur_file.add_log_details('flash', True) - self.cur_file.make_dangerous() - self._safe_copy() + self.make_dangerous('flash') def _ooxml(self): - """Processes an ooxml file.""" - self.cur_file.add_log_details('processing_type', 'ooxml') + """Process an ooxml file.""" + # LOG: processing_type property + self.set_property('processing_type', 'ooxml') try: - doc = officedissector.doc.Document(self.cur_file.src_path) + doc = officedissector.doc.Document(self.src_path) except Exception: - # Invalid file - self.cur_file.make_dangerous() - self._safe_copy() + self.make_dangerous('invalid ooxml file') return # There are probably other potentially malicious features: # fonts, custom props, custom XML if doc.is_macro_enabled or len(doc.features.macros) > 0: - self.cur_file.add_log_details('macro', True) - self.cur_file.make_dangerous() + self.make_dangerous('macro') if len(doc.features.embedded_controls) > 0: - self.cur_file.add_log_details('activex', True) - self.cur_file.make_dangerous() + self.make_dangerous('activex') if len(doc.features.embedded_objects) > 0: # Exploited by CVE-2014-4114 (OLE) - self.cur_file.add_log_details('embedded_obj', True) - self.cur_file.make_dangerous() + self.make_dangerous('embedded obj') if len(doc.features.embedded_packages) > 0: - self.cur_file.add_log_details('embedded_pack', True) - self.cur_file.make_dangerous() - self._safe_copy() + self.make_dangerous('embedded pack') def _libreoffice(self): - """Processes a libreoffice file.""" - self.cur_file.add_log_details('processing_type', 'libreoffice') - # As long as there ar no way to do a sanity check on the files => dangerous + """Process a libreoffice file.""" + self.set_property('processing_type', 'libreoffice') + # As long as there is no way to do a sanity check on the files => dangerous try: - lodoc = zipfile.ZipFile(self.cur_file.src_path, 'r') + lodoc = zipfile.ZipFile(self.src_path, 'r') except: - self.cur_file.add_log_details('invalid', True) - self.cur_file.make_dangerous() + # TODO: are there specific exceptions we should catch here? Or is anything ok + self.make_dangerous('invalid libreoffice file') for f in lodoc.infolist(): fname = f.filename.lower() if fname.startswith('script') or fname.startswith('basic') or \ fname.startswith('object') or fname.endswith('.bin'): - self.cur_file.add_log_details('macro', True) - self.cur_file.make_dangerous() - self._safe_copy() + self.make_dangerous('macro') def _pdf(self): - """Processes a PDF file.""" - self.cur_file.add_log_details('processing_type', 'pdf') - xmlDoc = PDFiD(self.cur_file.src_path) + """Process a PDF file.""" + # LOG: processing_type property + self.set_property('processing_type', 'pdf') + xmlDoc = PDFiD(self.src_path) oPDFiD = cPDFiD(xmlDoc, True) - # TODO: other keywords? + # TODO: are there other characteristics which should be dangerous? if oPDFiD.encrypt.count > 0: - self.cur_file.add_log_details('encrypted', True) - self.cur_file.make_dangerous() + self.make_dangerous('encrypted pdf') if oPDFiD.js.count > 0 or oPDFiD.javascript.count > 0: - self.cur_file.add_log_details('javascript', True) - self.cur_file.make_dangerous() + self.make_dangerous('pdf with javascript') if oPDFiD.aa.count > 0 or oPDFiD.openaction.count > 0: - self.cur_file.add_log_details('openaction', True) - self.cur_file.make_dangerous() + self.make_dangerous('openaction') if oPDFiD.richmedia.count > 0: - self.cur_file.add_log_details('flash', True) - self.cur_file.make_dangerous() + self.make_dangerous('flash') if oPDFiD.launch.count > 0: - self.cur_file.add_log_details('launch', True) - self.cur_file.make_dangerous() - self._safe_copy() + self.make_dangerous('launch') def _archive(self): - """Processes an archive using 7zip. The archive is extracted to a - temporary directory and self.processdir is called on that directory. - The recursive archive depth is increased to protect against archive - bombs.""" - self.cur_file.add_log_details('processing_type', 'archive') - self.cur_file.is_recursive = True - self.cur_file.log_string += 'Archive extracted, processing content.' - tmpdir = self.cur_file.dst_path + '_temp' - self._safe_mkdir(tmpdir) - extract_command = '{} -p1 x "{}" -o"{}" -bd -aoa'.format(SEVENZ_PATH, self.cur_file.src_path, tmpdir) - self._run_process(extract_command) - self.recursive_archive_depth += 1 - self.tree(tmpdir) - self.processdir(tmpdir, self.cur_file.dst_path) - self.recursive_archive_depth -= 1 - self._safe_rmtree(tmpdir) - - def _handle_archivebomb(self, src_dir): - self.cur_file.make_dangerous() - self.cur_file.add_log_details('Archive Bomb', True) - self.log_name.warning('ARCHIVE BOMB.') - self.log_name.warning('The content of the archive contains recursively other archives.') - self.log_name.warning('This is a bad sign so the archive is not extracted to the destination key.') - self._safe_rmtree(src_dir) - if src_dir.endswith('_temp'): - bomb_path = src_dir[:-len('_temp')] - self._safe_remove(bomb_path) + """ + Process an archive using 7zip. + + The archive is extracted to a temporary directory and self.process_dir + is called on that directory. The recursive archive depth is increased + to protect against archive bombs. + """ + # LOG: change this to something archive specific + self.set_property('processing_type', 'archive') + self.should_copy = False + self.is_recursive = True def _unknown_app(self): - """Processes an unknown file.""" - self.cur_file.make_unknown() - self._safe_copy() + """Process an unknown file.""" + self.make_unknown() def _binary_app(self): - """Processses an unknown binary file.""" - self.cur_file.make_binary() - self._safe_copy() + """Process an unknown binary file.""" + self.make_binary() ####################### # Metadata extractors - def _metadata_exif(self, metadata_file): - img = open(self.cur_file.src_path, 'rb') + def _metadata_exif(self, metadata_file_path): + """Read exif metadata from a jpg or tiff file using exifread.""" + # TODO: this method is kind of long, can we shorten it somehow? + img = open(self.src_path, 'rb') tags = None - try: tags = exifread.process_file(img, debug=True) except Exception as e: - print("Error while trying to grab full metadata for file {}; retrying for partial data.".format(self.cur_file.src_path)) - print(e) + self.add_error(e, "Error while trying to grab full metadata for file {}; retrying for partial data.".format(self.src_path)) if tags is None: try: tags = exifread.process_file(img, debug=True) except Exception as e: - print("Failed to get any metadata for file {}.".format(self.cur_file.src_path)) - print(e) + self.add_error(e, "Failed to get any metadata for file {}.".format(self.src_path)) img.close() return False - for tag in sorted(tags.keys()): - # These are long and obnoxious/binary + # These tags are long and obnoxious/binary so we don't add them if tag not in ('JPEGThumbnail', 'TIFFThumbnail'): - printable = str(tags[tag]) - + tag_string = str(tags[tag]) # Exifreader truncates data. - if len(printable) > 25 and printable.endswith(", ... ]"): - value = tags[tag].values - if isinstance(value, str): - printable = value - else: - printable = str(value) - metadata_file.write("Key: {}\tValue: {}\n".format(tag, printable)) - self.cur_file.add_log_details('metadata', 'exif') + if len(tag_string) > 25 and tag_string.endswith(", ... ]"): + tag_value = tags[tag].values + tag_string = str(tag_value) + with open(metadata_file_path, 'w+') as metadata_file: + metadata_file.write("Key: {}\tValue: {}\n".format(tag, tag_string)) + # LOG: how do we want to log metadata? + self.set_property('metadata', 'exif') img.close() return True - def _metadata_png(self, metadataFile): + def _metadata_png(self, metadata_file_path): + """Extract metadata from a png file using PIL/Pillow.""" warnings.simplefilter('error', Image.DecompressionBombWarning) try: - img = Image.open(self.cur_file.src_path) + img = Image.open(self.src_path) for tag in sorted(img.info.keys()): # These are long and obnoxious/binary if tag not in ('icc_profile'): - metadataFile.write("Key: {}\tValue: {}\n".format(tag, img.info[tag])) - self.cur_file.add_log_details('metadata', 'png') + with open(metadata_file_path, 'w+') as metadata_file: + metadata_file.write("Key: {}\tValue: {}\n".format(tag, img.info[tag])) + # LOG: handle metadata + self.set_property('metadata', 'png') img.close() # Catch decompression bombs except Exception as e: - print("Caught exception processing metadata for {}".format(self.cur_file.src_path)) - print(e) - self.cur_file.make_dangerous() - self._safe_copy() + # TODO: only catch DecompressionBombWarnings here? + self.add_error(e, "Caught exception processing metadata for {}".format(self.src_path)) + self.make_dangerous('exception processing metadata') return False def extract_metadata(self): - metadata_file = self._safe_metadata_split(".metadata.txt") - success = self.metadata_processing_options.get(self.cur_file.mimetype)(metadata_file) - metadata_file.close() - if not success: - # FIXME Delete empty metadata file - pass + """Create metadata file and call correct metadata extraction method.""" + metadata_file_path = self.create_metadata_file(".metadata.txt") + mt = self.mimetype + metadata_processing_method = self.metadata_mimetype_methods.get(mt) + if metadata_processing_method: + # TODO: should we return metadata and write it here instead of in processing method? + metadata_processing_method(metadata_file_path) ####################### # ##### Media - audio and video aren't converted ###### def audio(self): - """Processes an audio file.""" - self.cur_file.log_string += 'Audio file' + """Process an audio file.""" + self.log_string += 'Audio file' self._media_processing() def video(self): - """Processes a video.""" - self.cur_file.log_string += 'Video file' + """Process a video.""" + self.log_string += 'Video file' self._media_processing() def _media_processing(self): """Generic way to process all media files.""" - self.cur_file.add_log_details('processing_type', 'media') - self._safe_copy() + self.set_property('processing_type', 'media') def image(self): - """Processes an image. - - Extracts metadata if metadata is present. Creates a temporary - directory, opens the using PIL.Image, saves it to the temporary - directory, and copies it to the destination.""" - if self.cur_file.has_metadata(): + """ + Process an image. + + Extracts metadata to dest key using self.extract_metada() if metadata + is present. Creates a temporary directory on dest key, opens the image + using PIL.Image, saves it to the temporary directory, and copies it to + the destination. + """ + # TODO: make sure this method works for png, gif, tiff + if self.has_metadata: self.extract_metadata() - - # FIXME make sure this works for png, gif, tiff - # Create a temp directory - dst_dir, filename = os.path.split(self.cur_file.dst_path) - tmpdir = os.path.join(dst_dir, 'temp') - tmppath = os.path.join(tmpdir, filename) - self._safe_mkdir(tmpdir) - - # Do our image conversions + tempdir_path = self.make_tempdir() + tempfile_path = os.path.join(tempdir_path, self.filename) warnings.simplefilter('error', Image.DecompressionBombWarning) - try: - imIn = Image.open(self.cur_file.src_path) - imOut = Image.frombytes(imIn.mode, imIn.size, imIn.tobytes()) - imOut.save(tmppath) - - # Copy the file back out and cleanup - self._safe_copy(tmppath) - self._safe_rmtree(tmpdir) + try: # Do image conversions + img_in = Image.open(self.src_path) + img_out = Image.frombytes(img_in.mode, img_in.size, img_in.tobytes()) + img_out.save(tempfile_path) + self.src_path = tempfile_path + except Exception as e: # Catch decompression bombs + # TODO: change this from all Exceptions to specific DecompressionBombWarning + self.add_error(e, "Caught exception (possible decompression bomb?) while translating file {}.".format(self.src_path)) + self.make_dangerous() + self.add_file_string('Image file') + self.set_property('processing_type', 'image') - # Catch decompression bombs - except Exception as e: - print("Caught exception (possible decompression bomb?) while translating file {}.".format(self.cur_file.src_path)) - print(e) - self.cur_file.make_dangerous() - self._safe_copy() - self.cur_file.log_string += 'Image file' - self.cur_file.add_log_details('processing_type', 'image') +class KittenGroomerFileCheck(KittenGroomerBase): - ####################### + def __init__(self, root_src, root_dst, max_recursive_depth=2, debug=False): + super(KittenGroomerFileCheck, self).__init__(root_src, root_dst, debug) + self.recursive_archive_depth = 0 + self.max_recursive_depth = max_recursive_depth - def process_file(self, srcpath, dstpath, relative_path): - self.cur_file = File(srcpath, dstpath) - self.log_name.info('Processing {} ({}/{})', - relative_path, - self.cur_file.main_type, - self.cur_file.sub_type) - if not self.cur_file.is_dangerous(): - self.mime_processing_options.get(self.cur_file.main_type, self.unknown)() + def process_dir(self, src_dir, dst_dir): + """Process a directory on the source key.""" + self.logger.tree(src_dir) + for srcpath in self.list_all_files(src_dir): + dstpath = srcpath.replace(src_dir, dst_dir) + # TODO: Can we clean up the way we handle relative_path? + # Relative path is here so that when we print files in the log it + # shows only the file's path. Should we just pass it to the logger + # when we create it? Or let the logger figure it out? + # relative_path = srcpath.replace(src_dir + '/', '') + self.cur_file = File(srcpath, dstpath, self.logger) + self.process_file(self.cur_file) + + def process_file(self, file): + """ + Process an individual file. + + Check the file, handle archives using self.process_archive, copy + the file to the destionation key, and clean up temporary directory. + """ + file.check() + if file.is_recursive: + self.process_archive(file) + elif file.should_copy: + file.safe_copy() + file.set_property('copied', True) + file.write_log() + if hasattr(file, 'tempdir_path'): + self.safe_rmtree(file.tempdir_path) + + def process_archive(self, file): + """ + Unpack an archive using 7zip and process contents using process_dir. + + Should be given a Kittengroomer file object whose src_path points + to an archive. + """ + self.recursive_archive_depth += 1 + # LOG: write_log or somehow log the archive file here + if self.recursive_archive_depth >= self.max_recursive_depth: + file.make_dangerous('Archive bomb') else: - self._safe_copy() - if not self.cur_file.is_recursive: - self._print_log() + tempdir_path = file.make_tempdir() + # TODO: double check we are properly escaping file.src_path + # otherwise we are running unvalidated user input directly in the shell + command_str = '{} -p1 x "{}" -o"{}" -bd -aoa' + unpack_command = command_str.format(SEVENZ_PATH, + file.src_path, tempdir_path) + self._run_process(unpack_command) + self.process_dir(tempdir_path, file.dst_path) + self.safe_rmtree(tempdir_path) + self.recursive_archive_depth -= 1 - def processdir(self, src_dir=None, dst_dir=None): - """Main function coordinating file processing.""" - if src_dir is None: - src_dir = self.src_root_dir - if dst_dir is None: - dst_dir = self.dst_root_dir + def _run_process(self, command_string, timeout=None): + """Run command_string in a subprocess, wait until it finishes.""" + args = shlex.split(command_string) + with open(self.logger.log_debug_err, 'ab') as stderr, open(self.logger.log_debug_out, 'ab') as stdout: + try: + subprocess.check_call(args, stdout=stdout, stderr=stderr, timeout=timeout) + except (subprocess.TimeoutExpired, subprocess.CalledProcessError): + return + return True - if self.recursive_archive_depth > 0: - self._print_log() + def run(self): + self.process_dir(self.src_root_dir, self.dst_root_dir) - if self.recursive_archive_depth >= self.max_recursive_depth: - self._handle_archivebomb(src_dir) - for srcpath in self._list_all_files(src_dir): - dstpath = srcpath.replace(src_dir, dst_dir) - relative_path = srcpath.replace(src_dir + '/', '') - # which path do we want in the log? - self.process_file(srcpath, dstpath, relative_path) +def main(kg_implementation, description): + parser = argparse.ArgumentParser(prog='KittenGroomer', description=description) + parser.add_argument('-s', '--source', type=str, help='Source directory') + parser.add_argument('-d', '--destination', type=str, help='Destination directory') + args = parser.parse_args() + kg = kg_implementation(args.source, args.destination) + kg.run() if __name__ == '__main__': diff --git a/examples/generic.py b/examples/generic.py index e76fccd..220b3db 100644 --- a/examples/generic.py +++ b/examples/generic.py @@ -339,7 +339,7 @@ def processdir(self, src_dir=None, dst_dir=None): archbomb_path = src_dir[:-len('_temp')] self._safe_remove(archbomb_path) - for srcpath in self._list_all_files(src_dir): + for srcpath in self.list_all_files(src_dir): self.cur_file = File(srcpath, srcpath.replace(src_dir, dst_dir)) self.log_name.info('Processing {} ({}/{})', srcpath.replace(src_dir + '/', ''), diff --git a/examples/pier9.py b/examples/pier9.py index 6ded725..9252c4f 100644 --- a/examples/pier9.py +++ b/examples/pier9.py @@ -54,7 +54,7 @@ def processdir(self): ''' Main function doing the processing ''' - for srcpath in self._list_all_files(self.src_root_dir): + for srcpath in self.list_all_files(self.src_root_dir): self.log_name.info('Processing {}', srcpath.replace(self.src_root_dir + '/', '')) self.cur_file = FilePier9(srcpath, srcpath.replace(self.src_root_dir, self.dst_root_dir)) if not self.cur_file.is_dangerous() and self.cur_file.extension in self.authorized_extensions: diff --git a/examples/specific.py b/examples/specific.py index fcca2f4..724ba35 100644 --- a/examples/specific.py +++ b/examples/specific.py @@ -54,7 +54,7 @@ def processdir(self): ''' to_copy = [] error = [] - for srcpath in self._list_all_files(self.src_root_dir): + for srcpath in self.list_all_files(self.src_root_dir): valid = True self.log_name.info('Processing {}', srcpath.replace(self.src_root_dir + '/', '')) self.cur_file = FileSpec(srcpath, srcpath.replace(self.src_root_dir, self.dst_root_dir)) diff --git a/kittengroomer/__init__.py b/kittengroomer/__init__.py index 39aa699..8428553 100644 --- a/kittengroomer/__init__.py +++ b/kittengroomer/__init__.py @@ -1,4 +1,4 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -from .helpers import FileBase, KittenGroomerBase, main +from .helpers import FileBase, KittenGroomerBase, GroomerLogger, main diff --git a/kittengroomer/helpers.py b/kittengroomer/helpers.py index 4e82a73..318f1ca 100644 --- a/kittengroomer/helpers.py +++ b/kittengroomer/helpers.py @@ -9,13 +9,12 @@ import os -import sys import hashlib import shutil import argparse import magic -from twiggy import quick_setup, log +import twiggy class KittenGroomerError(Exception): @@ -28,197 +27,268 @@ def __init__(self, message): class ImplementationRequired(KittenGroomerError): """Implementation required error.""" - pass class FileBase(object): """ - Base object for individual files in the source directory. Contains file - attributes and various helper methods. Subclass and add attributes - or methods relevant to a given implementation. + Base object for individual files in the source directory. + + Contains file attributes and various helper methods. """ - def __init__(self, src_path, dst_path): - """Initialized with the source path and expected destination path.""" + def __init__(self, src_path, dst_path, logger=None): + """ + Initialized with the source path and expected destination path. + + self.logger should be a logging object with an add_file method. + Create various properties and determine the file's mimetype. + """ self.src_path = src_path self.dst_path = dst_path - self.log_details = {'filepath': self.src_path} - self.log_string = '' - self._determine_extension() - self._determine_mimetype() + self.filename = os.path.basename(self.src_path) + self.logger = logger + self._file_props = { + 'filepath': self.src_path, + 'filename': self.filename, + 'file_size': self.size, + 'maintype': None, + 'subtype': None, + 'extension': None, + 'safety_category': None, + 'symlink': False, + 'copied': False, + 'file_string_set': set(), + 'errors': {}, + 'user_defined': {} + } + self.extension = self._determine_extension() + self.set_property('extension', self.extension) + self.mimetype = self._determine_mimetype() + self.should_copy = True + self.main_type = None + self.sub_type = None + if self.mimetype: + self.main_type, self.sub_type = self._split_subtypes(self.mimetype) + if self.main_type: + self.set_property('maintype', self.main_type) + if self.sub_type: + self.set_property('subtype', self.sub_type) def _determine_extension(self): _, ext = os.path.splitext(self.src_path) - self.extension = ext.lower() + ext = ext.lower() + if ext == '': + ext = None + return ext def _determine_mimetype(self): if os.path.islink(self.src_path): # magic will throw an IOError on a broken symlink - self.mimetype = 'inode/symlink' + mimetype = 'inode/symlink' + self.set_property('symlink', os.readlink(self.src_path)) else: try: mt = magic.from_file(self.src_path, mime=True) - # magic will always return something, even if it's just 'data' + # Note: magic will always return something, even if it's just 'data' except UnicodeEncodeError as e: # FIXME: The encoding of the file is broken (possibly UTF-16) - mt = '' - self.log_details.update({'UnicodeError': e}) + # Note: one of the Travis files will trigger this exception + self.add_error(e, '') + mt = None try: - self.mimetype = mt.decode("utf-8") + mimetype = mt.decode("utf-8") except: - self.mimetype = mt - if self.mimetype and '/' in self.mimetype: - self.main_type, self.sub_type = self.mimetype.split('/') + mimetype = mt + return mimetype + + def _split_subtypes(self, mimetype): + if '/' in mimetype: + main_type, sub_type = mimetype.split('/') else: - self.main_type = '' - self.sub_type = '' + main_type, sub_type = None, None + return main_type, sub_type - def has_mimetype(self): - """ - Returns True if file has a full mimetype, else False. + @property + def size(self): + """Filesize in bytes as an int, 0 if file does not exist.""" + try: + size = os.path.getsize(self.src_path) + except FileNotFoundError: + size = 0 + return size - Returns False + updates log if self.main_type or self.sub_type - are not set. - """ + @property + def has_mimetype(self): + """True if file has a main and sub mimetype, else False.""" + # TODO: broken mimetype checks should be done somewhere else. + # Should the check be by default or should we let the API consumer write it? if not self.main_type or not self.sub_type: - self.log_details.update({'broken_mime': True}) return False - return True + else: + return True + @property def has_extension(self): - """ - Returns True if self.extension is set, else False. - - Returns False + updates self.log_details if self.extension is not set. - """ - if self.extension == '': - self.log_details.update({'no_extension': True}) + """True if self.extension is set, else False.""" + if self.extension is None: return False - return True + else: + return True + @property def is_dangerous(self): - """Returns True if self.log_details contains 'dangerous'.""" - return ('dangerous' in self.log_details) + """True if file has been marked 'dangerous', else False.""" + return self._file_props['safety_category'] is 'dangerous' + @property def is_unknown(self): - """Returns True if self.log_details contains 'unknown'.""" - return ('unknown' in self.log_details) + """True if file has been marked 'unknown', else False.""" + return self._file_props['safety_category'] is 'unknown' + @property def is_binary(self): - """returns True if self.log_details contains 'binary'.""" - return ('binary' in self.log_details) + """True if file has been marked 'binary', else False.""" + return self._file_props['safety_category'] is 'binary' + @property def is_symlink(self): - """Returns True and updates log if file is a symlink.""" - if self.has_mimetype() and self.main_type == 'inode' and self.sub_type == 'symlink': - self.log_details.update({'symlink': os.readlink(self.src_path)}) + """True if file is a symlink, else False.""" + if self._file_props['symlink'] is False: + return False + else: return True - return False - def add_log_details(self, key, value): - """Takes a key + a value and adds them to self.log_details.""" - self.log_details[key] = value + def set_property(self, prop_string, value): + """ + Take a property and a value and add them to self._file_props. + + If prop_string is already in _file_props, set prop_string to value. + If prop_string not in _file_props, set prop_string to value in + _file_props['user_defined']. + """ + if prop_string in self._file_props.keys(): + self._file_props[prop_string] = value + else: + self._file_props['user_defined'][prop_string] = value + + def get_property(self, file_prop): + """Get the value for a property in _file_props.""" + # TODO: could probably be refactored + if file_prop in self._file_props: + return self._file_props[file_prop] + elif file_prop in self._file_props['user_defined']: + return self._file_props['user_defined'][file_prop] + else: + return None + + def add_error(self, error, info): + """Add an error: info pair to _file_props['errors'].""" + self._file_props['errors'].update({error: info}) - def make_dangerous(self): + def add_file_string(self, file_string): + """Add a file descriptor string to _file_props.""" + self._file_props['file_string_set'].add(file_string) + + def make_dangerous(self, reason_string=None): """ - Marks a file as dangerous. + Mark file as dangerous. - Prepends and appends DANGEROUS to the destination file name + Prepend and append DANGEROUS to the destination file name to help prevent double-click of death. """ - if self.is_dangerous(): + if self.is_dangerous: return - self.log_details['dangerous'] = True + self.set_property('safety_category', 'dangerous') + # LOG: store reason string somewhere and do something with it path, filename = os.path.split(self.dst_path) self.dst_path = os.path.join(path, 'DANGEROUS_{}_DANGEROUS'.format(filename)) def make_unknown(self): - """Marks a file as an unknown type and prepends UNKNOWN to filename.""" - if self.is_dangerous() or self.is_binary(): + """Mark file as an unknown type and prepend UNKNOWN to filename.""" + if self.is_dangerous or self.is_binary: return - self.log_details['unknown'] = True + self.set_property('safety_category', 'unknown') path, filename = os.path.split(self.dst_path) self.dst_path = os.path.join(path, 'UNKNOWN_{}'.format(filename)) def make_binary(self): - """Marks a file as a binary and appends .bin to filename.""" - if self.is_dangerous(): + """Mark file as a binary and append .bin to filename.""" + if self.is_dangerous: return - self.log_details['binary'] = True + self.set_property('safety_category', 'binary') path, filename = os.path.split(self.dst_path) self.dst_path = os.path.join(path, '{}.bin'.format(filename)) + def safe_copy(self, src=None, dst=None): + """Copy file and create destination directories if needed.""" + if src is None: + src = self.src_path + if dst is None: + dst = self.dst_path + try: + dst_path, filename = os.path.split(dst) + if not os.path.exists(dst_path): + os.makedirs(dst_path) + shutil.copy(src, dst) + except Exception as e: + self.add_error(e, '') + def force_ext(self, ext): - """If dst_path does not end in ext, appends the ext and updates log.""" + """If dst_path does not end in ext, change it and edit _file_props.""" if not self.dst_path.endswith(ext): - self.log_details['force_ext'] = True + self.set_property('force_ext', True) self.dst_path += ext + if not self._file_props['extension'] == ext: + self.set_property('extension', ext) + def create_metadata_file(self, ext): + """Create a separate file to hold metadata from this file.""" + try: + # make sure we aren't overwriting anything + if os.path.exists(self.src_path + ext): + raise KittenGroomerError("Cannot create split metadata file for \"" + + self.dst_path + "\", type '" + + ext + "': File exists.") + else: + dst_dir_path, filename = os.path.split(self.dst_path) + if not os.path.exists(dst_dir_path): + os.makedirs(dst_dir_path) + # TODO: Check extension for leading "." + self.metadata_file_path = self.dst_path + ext + return self.metadata_file_path + except KittenGroomerError as e: + self.add_error(e, '') + return False -class KittenGroomerBase(object): - """Base object responsible for copy/sanitization process.""" - - def __init__(self, root_src, root_dst, debug=False): - """Initialized with path to source and dest directories.""" - self.src_root_dir = root_src - self.dst_root_dir = root_dst - self.log_root_dir = os.path.join(self.dst_root_dir, 'logs') - self._safe_rmtree(self.log_root_dir) - self._safe_mkdir(self.log_root_dir) - self.log_processing = os.path.join(self.log_root_dir, 'processing.log') - self.log_content = os.path.join(self.log_root_dir, 'content.log') - self.tree(self.src_root_dir) - - quick_setup(file=self.log_processing) - self.log_name = log.name('files') - self.resources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data') - os.environ["PATH"] += os.pathsep + self.resources_path - - self.cur_file = None - - self.debug = debug - if self.debug: - self.log_debug_err = os.path.join(self.log_root_dir, 'debug_stderr.log') - self.log_debug_out = os.path.join(self.log_root_dir, 'debug_stdout.log') + def write_log(self): + """Write logs from file to self.logger.""" + file_log = self.logger.add_file(self) + file_log.fields(**self._file_props) + + +class GroomerLogger(object): + """Groomer logging interface.""" + + def __init__(self, root_dir_path, debug=False): + self.root_dir = root_dir_path + self.log_dir_path = os.path.join(root_dir_path, 'logs') + if os.path.exists(self.log_dir_path): + shutil.rmtree(self.log_dir_path) + os.makedirs(self.log_dir_path) + self.log_processing = os.path.join(self.log_dir_path, 'processing.log') + self.log_content = os.path.join(self.log_dir_path, 'content.log') + twiggy.quick_setup(file=self.log_processing) + self.log = twiggy.log.name('files') + if debug: + self.log_debug_err = os.path.join(self.log_dir_path, 'debug_stderr.log') + self.log_debug_out = os.path.join(self.log_dir_path, 'debug_stdout.log') else: self.log_debug_err = os.devnull self.log_debug_out = os.devnull - def _computehash(self, path): - """Returns a sha256 hash of a file at a given path.""" - s = hashlib.sha256() - with open(path, 'rb') as f: - while True: - buf = f.read(0x100000) - if not buf: - break - s.update(buf) - return s.hexdigest() - def tree(self, base_dir, padding=' '): - """Writes a graphical tree to the log for a given directory.""" - if sys.version_info.major == 2: - self.__tree_py2(base_dir, padding) - else: - self.__tree_py3(base_dir, padding) - - def __tree_py2(self, base_dir, padding=' '): - with open(self.log_content, 'ab') as lf: - lf.write('#' * 80 + '\n') - lf.write('{}+- {}/\n'.format(padding, os.path.basename(os.path.abspath(base_dir)))) - padding += '| ' - files = sorted(os.listdir(base_dir)) - for f in files: - curpath = os.path.join(base_dir, f) - if os.path.islink(curpath): - lf.write('{}+-- {}\t- Symbolic link to {}\n'.format(padding, f, os.readlink(curpath))) - elif os.path.isdir(curpath): - self.tree(curpath, padding) - elif os.path.isfile(curpath): - lf.write('{}+-- {}\t- {}\n'.format(padding, f, self._computehash(curpath))) - - def __tree_py3(self, base_dir, padding=' '): + """Write a graphical tree to the log for `base_dir`.""" with open(self.log_content, 'ab') as lf: lf.write(bytes('#' * 80 + '\n', 'UTF-8')) lf.write(bytes('{}+- {}/\n'.format(padding, os.path.basename(os.path.abspath(base_dir)).encode()), 'utf8')) @@ -233,80 +303,64 @@ def __tree_py3(self, base_dir, padding=' '): elif os.path.isfile(curpath): lf.write('{}+-- {}\t- {}\n'.format(padding, f, self._computehash(curpath)).encode(errors='ignore')) - # ##### Helpers ##### - def _safe_rmtree(self, directory): + def _computehash(self, path): + """Return a sha256 hash of a file at a given path.""" + s = hashlib.sha256() + with open(path, 'rb') as f: + while True: + buf = f.read(0x100000) + if not buf: + break + s.update(buf) + return s.hexdigest() + + def add_file(self, file): + """Add a file to the log.""" + return self.log.name('file.src_path') + + +class KittenGroomerBase(object): + """Base object responsible for copy/sanitization process.""" + + def __init__(self, root_src, root_dst, debug=False): + """Initialized with path to source and dest directories.""" + self.src_root_dir = root_src + self.dst_root_dir = root_dst + self.debug = debug + self.cur_file = None + self.logger = GroomerLogger(self.dst_root_dir, debug) + + def safe_rmtree(self, directory): """Remove a directory tree if it exists.""" if os.path.exists(directory): shutil.rmtree(directory) - def _safe_remove(self, filepath): + def safe_remove(self, filepath): """Remove a file if it exists.""" if os.path.exists(filepath): os.remove(filepath) - def _safe_mkdir(self, directory): + def safe_mkdir(self, directory): """Make a directory if it does not exist.""" if not os.path.exists(directory): os.makedirs(directory) - def _safe_copy(self, src=None, dst=None): - """Copy a file and create directory if needed.""" - if src is None: - src = self.cur_file.src_path - if dst is None: - dst = self.cur_file.dst_path - try: - dst_path, filename = os.path.split(dst) - self._safe_mkdir(dst_path) - shutil.copy(src, dst) - return True - except Exception as e: - # TODO: Logfile - print(e) - return False - - def _safe_metadata_split(self, ext): - """Create a separate file to hold this file's metadata.""" - # TODO: fix logic in this method - dst = self.cur_file.dst_path - try: - if os.path.exists(self.cur_file.src_path + ext): # should we check dst_path as well? - raise KittenGroomerError("Cannot create split metadata file for \"" + - self.cur_file.dst_path + "\", type '" + - ext + "': File exists.") - dst_path, filename = os.path.split(dst) - self._safe_mkdir(dst_path) - return open(dst + ext, 'w+') - except Exception as e: - # TODO: Logfile - print(e) - return False - - def _list_all_files(self, directory): - """Generate an iterator over all the files in a directory tree.""" + def list_all_files(self, directory): + """Generator yielding path to all of the files in a directory tree.""" for root, dirs, files in os.walk(directory): for filename in files: filepath = os.path.join(root, filename) yield filepath - def _print_log(self): - """ - Print log, should be called after each file. - - You probably want to reimplement it in the subclass. - """ - tmp_log = self.log_name.fields(**self.cur_file.log_details) - tmp_log.info('It did a thing.') - ####################### - def processdir(self, src_dir=None, dst_dir=None): - """ - Implement this function in your subclass to define file processing behavior. - """ + # TODO: feels like this function doesn't need to exist if we move main() + def processdir(self, src_dir, dst_dir): + """Implement this function to define file processing behavior.""" raise ImplementationRequired('Please implement processdir.') +# TODO: Maybe this shouldn't exist? It should probably get moved to filecheck since this isn't really API code def main(kg_implementation, description='Call a KittenGroomer implementation to process files present in the source directory and copy them to the destination directory.'): parser = argparse.ArgumentParser(prog='KittenGroomer', description=description) parser.add_argument('-s', '--source', type=str, help='Source directory') diff --git a/setup.py b/setup.py index c11f64d..4397c5f 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ setup( name='kittengroomer', - version='2.1', + version='2.1.0', author='Raphaël Vinot', author_email='raphael.vinot@circl.lu', maintainer='Raphaël Vinot', diff --git a/tests/dst/.keepdir b/tests/dst/.keepdir deleted file mode 100644 index e69de29..0000000 diff --git a/tests/logging.py b/tests/logging.py index e625137..c937a71 100644 --- a/tests/logging.py +++ b/tests/logging.py @@ -6,17 +6,17 @@ def save_logs(groomer, test_description): test_log_path = 'tests/test_logs/{}.log'.format(test_description) with open(test_log_path, 'w+') as test_log: test_log.write(divider.format('TEST LOG')) - with open(groomer.log_processing, 'r') as logfile: + with open(groomer.logger.log_processing, 'r') as logfile: log = logfile.read() test_log.write(log) if groomer.debug: - if os.path.exists(groomer.log_debug_err): + if os.path.exists(groomer.logger.log_debug_err): test_log.write(divider.format('ERR LOG')) - with open(groomer.log_debug_err, 'r') as debug_err: + with open(groomer.logger.log_debug_err, 'r') as debug_err: err = debug_err.read() test_log.write(err) - if os.path.exists(groomer.log_debug_out): + if os.path.exists(groomer.logger.log_debug_out): test_log.write(divider.format('OUT LOG')) - with open(groomer.log_debug_out, 'r') as debug_out: + with open(groomer.logger.log_debug_out, 'r') as debug_out: out = debug_out.read() test_log.write(out) diff --git a/tests/src_valid/Example.jpg b/tests/src_valid/Example.jpg new file mode 100644 index 0000000..a686d10 Binary files /dev/null and b/tests/src_valid/Example.jpg differ diff --git a/tests/test_filecheck.py b/tests/test_filecheck.py index ac7cf42..f58152d 100644 --- a/tests/test_filecheck.py +++ b/tests/test_filecheck.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- import os +import shutil import pytest @@ -20,29 +21,46 @@ class TestIntegration: @pytest.fixture - def src_valid(self): + def src_valid_path(self): return os.path.join(os.getcwd(), 'tests/src_valid') @pytest.fixture - def src_invalid(self): + def src_invalid_path(self): return os.path.join(os.getcwd(), 'tests/src_invalid') @pytest.fixture def dst(self): return os.path.join(os.getcwd(), 'tests/dst') - def test_filecheck(self, src_invalid, dst): - groomer = KittenGroomerFileCheck(src_invalid, dst, debug=True) - groomer.processdir() + def test_filecheck_src_invalid(self, src_invalid_path): + dst_path = self.make_dst_dir_path(src_invalid_path) + groomer = KittenGroomerFileCheck(src_invalid_path, dst_path, debug=True) + groomer.run() test_description = "filecheck_invalid" save_logs(groomer, test_description) - def test_filecheck_2(self, src_valid, dst): - groomer = KittenGroomerFileCheck(src_valid, dst, debug=True) - groomer.processdir() + def test_filecheck_2(self, src_valid_path): + dst_path = self.make_dst_dir_path(src_valid_path) + groomer = KittenGroomerFileCheck(src_valid_path, dst_path, debug=True) + groomer.run() test_description = "filecheck_valid" save_logs(groomer, test_description) + def test_processdir(self): + pass + + def test_handle_archives(self): + pass + + def make_dst_dir_path(self, src_dir_path): + dst_path = src_dir_path + '_dst' + shutil.rmtree(dst_path, ignore_errors=True) + os.makedirs(dst_path, exist_ok=True) + return dst_path + class TestFileHandling: - pass + def test_autorun(self): + # Run on a single autorun file, confirm that it gets flagged as dangerous + # TODO: build out these and other methods for individual file cases + pass diff --git a/tests/test_kittengroomer.py b/tests/test_kittengroomer.py index 9698a95..940136d 100644 --- a/tests/test_kittengroomer.py +++ b/tests/test_kittengroomer.py @@ -5,7 +5,7 @@ import pytest -from kittengroomer import FileBase, KittenGroomerBase +from kittengroomer import FileBase, KittenGroomerBase, GroomerLogger from kittengroomer.helpers import ImplementationRequired skip = pytest.mark.skip @@ -30,7 +30,7 @@ def generic_conf_file(self, source_file, dest_file): return FileBase(source_file, dest_file) @fixture - def symlink(self, tmpdir): + def symlink_file(self, tmpdir): file_path = tmpdir.join('test.txt') file_path.write('testing') file_path = file_path.strpath @@ -65,7 +65,7 @@ def file_marked_unknown(self, generic_conf_file): @fixture def file_marked_binary(self, generic_conf_file): - generic_conf_file.mark_binary() + generic_conf_file.make_binary() return generic_conf_file @fixture(params=[ @@ -81,27 +81,17 @@ def file_marked_all_parameterized(self, request, generic_conf_file): # What should FileBase do if it's given a path that isn't a file (doesn't exist or is a dir)? Currently magic throws an exception # We should probably catch everytime that happens and tell the user explicitly happened (and maybe put it in the log) - def test_create(self): - file = FileBase('tests/src_valid/blah.conf', '/tests/dst/blah.conf') - def test_create_broken(self, tmpdir): with pytest.raises(TypeError): - file_no_args = FileBase() + FileBase() with pytest.raises(FileNotFoundError): - file_empty_args = FileBase('', '') + FileBase('', '') with pytest.raises(IsADirectoryError): - file_directory = FileBase(tmpdir.strpath, tmpdir.strpath) - # are there other cases here? path to a file that doesn't exist? permissions? + FileBase(tmpdir.strpath, tmpdir.strpath) + # TODO: are there other cases here? path to a file that doesn't exist? permissions? def test_init(self, generic_conf_file): - file = generic_conf_file - assert file.log_details - assert file.log_details['filepath'] == file.src_path - assert file.extension == '.conf' - copied_log = file.log_details.copy() - file.log_details = '' - # assert file.log_details == copied_log # this fails for now, we need to make log_details undeletable - # we should probably check for more extensions here + generic_conf_file def test_extension_uppercase(self, tmpdir): file_path = tmpdir.join('TEST.TXT') @@ -111,43 +101,42 @@ def test_extension_uppercase(self, tmpdir): assert file.extension == '.txt' def test_mimetypes(self, generic_conf_file): - assert generic_conf_file.has_mimetype() assert generic_conf_file.mimetype == 'text/plain' assert generic_conf_file.main_type == 'text' assert generic_conf_file.sub_type == 'plain' + assert generic_conf_file.has_mimetype # Need to test something without a mimetype # Need to test something that's a directory # Need to test something that causes the unicode exception def test_has_mimetype_no_main_type(self, generic_conf_file): generic_conf_file.main_type = '' - assert generic_conf_file.has_mimetype() is False + assert generic_conf_file.has_mimetype is False def test_has_mimetype_no_sub_type(self, generic_conf_file): generic_conf_file.sub_type = '' - assert generic_conf_file.has_mimetype() is False + assert generic_conf_file.has_mimetype is False def test_has_extension(self, temp_file, temp_file_no_ext): - assert temp_file.has_extension() is True - assert temp_file_no_ext.has_extension() is False - assert temp_file_no_ext.log_details.get('no_extension') is True + assert temp_file.has_extension is True + print(temp_file_no_ext.extension) + assert temp_file_no_ext.has_extension is False - def test_add_log_details(self, generic_conf_file): - generic_conf_file.add_log_details('test', True) - assert generic_conf_file.log_details['test'] is True - with pytest.raises(KeyError): - assert generic_conf_file.log_details['wrong'] is False + def test_set_property(self, generic_conf_file): + generic_conf_file.set_property('test', True) + assert generic_conf_file.get_property('test') is True + assert generic_conf_file.get_property('wrong') is None def test_marked_dangerous(self, file_marked_all_parameterized): file_marked_all_parameterized.make_dangerous() - assert file_marked_all_parameterized.is_dangerous() is True + assert file_marked_all_parameterized.is_dangerous is True # Should work regardless of weird paths?? # Should check file path alteration behavior as well def test_generic_dangerous(self, generic_conf_file): - assert generic_conf_file.is_dangerous() is False + assert generic_conf_file.is_dangerous is False generic_conf_file.make_dangerous() - assert generic_conf_file.is_dangerous() is True + assert generic_conf_file.is_dangerous is True def test_has_symlink(self, tmpdir): file_path = tmpdir.join('test.txt') @@ -155,64 +144,88 @@ def test_has_symlink(self, tmpdir): file_path = file_path.strpath symlink_path = tmpdir.join('symlinked.txt') symlink_path = symlink_path.strpath - file_symlink = os.symlink(file_path, symlink_path) + os.symlink(file_path, symlink_path) file = FileBase(file_path, file_path) symlink = FileBase(symlink_path, symlink_path) - assert file.is_symlink() is False - assert symlink.is_symlink() is True + assert file.is_symlink is False + assert symlink.is_symlink is True - def test_has_symlink_fixture(self, symlink): - assert symlink.is_symlink() is True + def test_has_symlink_fixture(self, symlink_file): + assert symlink_file.is_symlink is True def test_generic_make_unknown(self, generic_conf_file): - assert generic_conf_file.log_details.get('unknown') is None + assert generic_conf_file.is_unknown is False generic_conf_file.make_unknown() - assert generic_conf_file.log_details.get('unknown') is True + assert generic_conf_file.is_unknown # given a FileBase object with no marking, should do the right things def test_marked_make_unknown(self, file_marked_all_parameterized): file = file_marked_all_parameterized - if file.log_details.get('unknown'): + if file.is_unknown: file.make_unknown() - assert file.log_details.get('unknown') is True + assert file.is_unknown else: - assert file.log_details.get('unknown') is None + assert file.is_unknown is False file.make_unknown() - assert file.log_details.get('unknown') is None + assert file.is_unknown is False # given a FileBase object with an unrecognized marking, should ??? def test_generic_make_binary(self, generic_conf_file): - assert generic_conf_file.log_details.get('binary') is None + assert generic_conf_file.is_binary is False generic_conf_file.make_binary() - assert generic_conf_file.log_details.get('binary') is True + assert generic_conf_file.is_binary def test_marked_make_binary(self, file_marked_all_parameterized): file = file_marked_all_parameterized - if file.log_details.get('dangerous'): + if file.is_dangerous: file.make_binary() - assert file.log_details.get('binary') is None + assert file.is_binary is False else: file.make_binary() - assert file.log_details.get('binary') is True + assert file.is_binary def test_force_ext_change(self, generic_conf_file): - assert generic_conf_file.has_extension() - assert generic_conf_file.extension == '.conf' + assert generic_conf_file.has_extension + assert generic_conf_file.get_property('extension') == '.conf' assert os.path.splitext(generic_conf_file.dst_path)[1] == '.conf' generic_conf_file.force_ext('.txt') assert os.path.splitext(generic_conf_file.dst_path)[1] == '.txt' - assert generic_conf_file.log_details.get('force_ext') is True - # should make a file's extension change + assert generic_conf_file.get_property('force_ext') is True + assert generic_conf_file.get_property('extension') == '.txt' # should be able to handle weird paths def test_force_ext_correct(self, generic_conf_file): - assert generic_conf_file.has_extension() - assert generic_conf_file.extension == '.conf' + assert generic_conf_file.has_extension + assert generic_conf_file.get_property('extension') == '.conf' generic_conf_file.force_ext('.conf') assert os.path.splitext(generic_conf_file.dst_path)[1] == '.conf' - assert generic_conf_file.log_details.get('force_ext') is None + assert generic_conf_file.get_property('force_ext') is None # shouldn't change a file's extension if it already is right + def test_create_metadata_file(self, temp_file): + # Try making a metadata file + metadata_file_path = temp_file.create_metadata_file('.metadata.txt') + with open(metadata_file_path, 'w+') as metadata_file: + metadata_file.write('Have some metadata!') + # Shouldn't be able to make a metadata file with no extension + assert temp_file.create_metadata_file('') is False + # if metadata file already exists + # if there is no metadata to write should this work? + + def test_safe_copy(self, generic_conf_file): + generic_conf_file.safe_copy() + # check that safe copy can handle weird file path inputs + + +class TestLogger: + + @fixture + def generic_logger(self, tmpdir): + return GroomerLogger(tmpdir.strpath) + + def test_tree(self, generic_logger): + generic_logger.tree(generic_logger.root_dir) + class TestKittenGroomerBase: @@ -236,39 +249,6 @@ def test_instantiation(self, source_directory, dest_directory): debug_groomer = KittenGroomerBase(source_directory, dest_directory, debug=True) - # we should maybe protect access to self.current_file in some way? - - def test_computehash(self, tmpdir): - file = tmpdir.join('test.txt') - file.write('testing') - simple_groomer = KittenGroomerBase(tmpdir.strpath, tmpdir.strpath) - simple_groomer._computehash(file.strpath) - - def test_tree(self, generic_groomer): - generic_groomer.tree(generic_groomer.src_root_dir) - - def test_safe_copy(self, tmpdir): - file = tmpdir.join('test.txt') - file.write('testing') - testdir = tmpdir.join('testdir') - os.mkdir(testdir.strpath) - filedest = testdir.join('test.txt') - simple_groomer = KittenGroomerBase(tmpdir.strpath, testdir.strpath) - simple_groomer.cur_file = FileBase(file.strpath, filedest.strpath) - assert simple_groomer._safe_copy() is True - #check that it handles weird file path inputs - - def test_safe_metadata_split(self, tmpdir): - file = tmpdir.join('test.txt') - file.write('testing') - simple_groomer = KittenGroomerBase(tmpdir.strpath, tmpdir.strpath) - simple_groomer.cur_file = FileBase(file.strpath, file.strpath) - metadata_file = simple_groomer._safe_metadata_split('metadata.log') - metadata_file.write('Have some metadata!') - metadata_file.close() - assert simple_groomer._safe_metadata_split('') is False - # if metadata file already exists - # if there is no metadata to write should this work? def test_list_all_files(self, tmpdir): file = tmpdir.join('test.txt') @@ -276,15 +256,6 @@ def test_list_all_files(self, tmpdir): testdir = tmpdir.join('testdir') os.mkdir(testdir.strpath) simple_groomer = KittenGroomerBase(tmpdir.strpath, tmpdir.strpath) - files = simple_groomer._list_all_files(simple_groomer.src_root_dir) + files = simple_groomer.list_all_files(simple_groomer.src_root_dir) assert file.strpath in files assert testdir.strpath not in files - - def test_print_log(self, generic_groomer): - with pytest.raises(AttributeError): - generic_groomer._print_log() - # Kind of a bad test, but this should be implemented by the user anyway - - def test_processdir(self, generic_groomer): - with pytest.raises(ImplementationRequired): - generic_groomer.processdir() diff --git a/tests/testfile_catalog.md b/tests/testfile_catalog.md new file mode 100644 index 0000000..692daf8 --- /dev/null +++ b/tests/testfile_catalog.md @@ -0,0 +1,12 @@ +src_invalid +=========== + +- + + + +src_valid +========= + +- Example.jpg: image/jpeg, obtained from wikipedia.org +- blah.conf: text file with a .conf extension \ No newline at end of file