diff --git a/Makefile b/Makefile index ca297c7..ceb3659 100644 --- a/Makefile +++ b/Makefile @@ -50,6 +50,10 @@ check-mypy: \ source venv/bin/activate \ && mypy \ indxparse + source venv/bin/activate \ + && mypy \ + --strict \ + indxparse/MFTINDX.py check-third_party: $(MAKE) \ diff --git a/indxparse/MFT.py b/indxparse/MFT.py index e65047d..0fcb060 100755 --- a/indxparse/MFT.py +++ b/indxparse/MFT.py @@ -501,6 +501,7 @@ def __init__( self.entry_list_start: typing.Callable[[], int] self.declare_field("dword", "entry_list_end") + self.entry_list_end: typing.Callable[[], int] self.declare_field("dword", "entry_list_allocation_end") self.entry_list_allocation_end: typing.Callable[[], int] @@ -514,7 +515,7 @@ def __init__( self.entry_list_allocation_end() - self.entry_list_start(), ) - def entries(self): + def entries(self) -> typing.Iterator[IndexEntry]: """ A generator that returns each INDX entry associated with this node. """ @@ -528,7 +529,7 @@ def entries(self): offset += e.length() yield e - def slack_entries(self): + def slack_entries(self) -> typing.Iterator[SlackIndexEntry]: """ A generator that yields INDX entries found in the slack space associated with this header. @@ -569,7 +570,7 @@ def __init__( self.declare_field("byte", "unused3") self._node_header_offset = self.current_field_offset() - def node_header(self): + def node_header(self) -> NTATTR_STANDARD_INDEX_HEADER: return NTATTR_STANDARD_INDEX_HEADER( self._buf, self.offset() + self._node_header_offset, self ) @@ -586,6 +587,7 @@ def __init__( super(IndexRecordHeader, self).__init__(buf, offset, parent) self.declare_field("dword", "magic", 0x0) + self.magic: typing.Callable[[], int] self.declare_field("word", "usa_offset") self.usa_offset: typing.Callable[[], int] @@ -600,7 +602,7 @@ def __init__( self._node_header_offset = self.current_field_offset() self.fixup(self.usa_count(), self.usa_offset()) - def node_header(self): + def node_header(self) -> NTATTR_STANDARD_INDEX_HEADER: return NTATTR_STANDARD_INDEX_HEADER( self._buf, self.offset() + self._node_header_offset, self ) @@ -669,6 +671,7 @@ def __init__( self.declare_field("qword", "mft_reference", 0x0) self.declare_field("word", "length") + self.length: typing.Callable[[], int] self.declare_field("word", "filename_information_length") self.filename_information_length: typing.Callable[[], int] @@ -681,12 +684,13 @@ def __init__( self.current_field_offset(), self.filename_information_length(), ) + self._off_filename_information_buffer: int self.declare_field( "qword", "child_vcn", align(self.current_field_offset(), 0x8) ) - def filename_information(self): + def filename_information(self) -> FilenameAttribute: return FilenameAttribute( self._buf, self.offset() + self._off_filename_information_buffer, self ) @@ -724,6 +728,7 @@ def __init__( self.accessed_time: typing.Callable[[], datetime] self.declare_field("dword", "attributes") + self.attributes: typing.Callable[[], int] self.declare_field("binary", "reserved", self.current_field_offset(), 0xC) @@ -741,7 +746,7 @@ def __init__( # def __len__(self): # return 0x42 + (self.filename_length() * 2) - def owner_id(self): + def owner_id(self) -> int: """ This is an explicit method because it may not exist in OSes under Win2k @@ -763,7 +768,7 @@ def security_id(self) -> int: except OverrunBufferException: raise StandardInformationFieldDoesNotExist("Security ID") - def quota_charged(self): + def quota_charged(self) -> int: """ This is an explicit method because it may not exist in OSes under Win2k @@ -774,7 +779,7 @@ def quota_charged(self): except OverrunBufferException: raise StandardInformationFieldDoesNotExist("Quota Charged") - def usn(self): + def usn(self) -> int: """ This is an explicit method because it may not exist in OSes under Win2k @@ -797,6 +802,7 @@ def __init__( super(FilenameAttribute, self).__init__(buf, offset) self.declare_field("qword", "mft_parent_reference", 0x0) + self.mft_parent_reference: typing.Callable[[], int] self.declare_field("filetime", "created_time") self.created_time: typing.Callable[[], datetime] @@ -811,11 +817,13 @@ def __init__( self.accessed_time: typing.Callable[[], datetime] self.declare_field("qword", "physical_size") + self.physical_size: typing.Callable[[], int] self.declare_field("qword", "logical_size") self.logical_size: typing.Callable[[], int] self.declare_field("dword", "flags") + self.flags: typing.Callable[[], int] self.declare_field("dword", "reparse_value") @@ -901,10 +909,12 @@ def __init__( self.declare_field( "binary", "length_binary", self.current_field_offset(), self._length_length ) + self.length_binary: typing.Callable[[], array.array] self.declare_field( "binary", "offset_binary", self.current_field_offset(), self._offset_length ) + self.offset_binary: typing.Callable[[], array.array] @staticmethod def structure_size( @@ -947,11 +957,11 @@ def lsb2signednum(self, binary: array.array): ret *= -1 return ret - def offset(self): + def offset(self) -> int: # TODO(wb): make this run_offset return self.lsb2signednum(self.offset_binary()) - def length(self): + def length(self) -> int: # TODO(wb): make this run_offset return self.lsb2num(self.length_binary()) @@ -984,7 +994,7 @@ def structure_size( def __len__(self): return sum(map(len, self._entries())) - def _entries(self, length=None): + def _entries(self, length: typing.Optional[int] = None) -> typing.List[Runentry]: ret = [] offset = self.offset() entry = Runentry(self._buf, offset, self) @@ -998,7 +1008,7 @@ def _entries(self, length=None): entry = Runentry(self._buf, offset, self) return ret - def runs(self, length=None): + def runs(self, length=None) -> typing.Iterator[typing.Tuple[int, int]]: """ Yields tuples (volume offset, length). Recall that the entries are relative to one another @@ -1084,6 +1094,7 @@ def __init__( self.name_offset: typing.Callable[[], int] self.declare_field("word", "flags") + self.flags: typing.Callable[[], int] self.declare_field("word", "instance") @@ -1093,6 +1104,7 @@ def __init__( self.declare_field("qword", "highest_vcn") self.declare_field("word", "runlist_offset") + self.runlist_offset: typing.Callable[[], int] self.declare_field("byte", "compression_unit") @@ -1107,6 +1119,7 @@ def __init__( self.declare_field("byte", "reserved5") self.declare_field("qword", "allocated_size") + self.allocated_size: typing.Callable[[], int] self.declare_field("qword", "data_size") self.data_size: typing.Callable[[], int] @@ -1143,7 +1156,7 @@ def structure_size( def __len__(self): return self.size() - def runlist(self): + def runlist(self) -> Runlist: return Runlist(self._buf, self.offset() + self.runlist_offset(), self) def size(self): @@ -1159,14 +1172,14 @@ class MFT_RECORD_FLAGS: MFT_RECORD_IS_DIRECTORY = 0x2 -def MREF(mft_reference): +def MREF(mft_reference) -> int: """ Given a MREF/mft_reference, return the record number part. """ return mft_reference & 0xFFFFFFFFFFFF -def MSEQNO(mft_reference): +def MSEQNO(mft_reference) -> int: """ Given a MREF/mft_reference, return the sequence number part. """ @@ -1200,6 +1213,7 @@ def __init__( self.declare_field("qword", "lsn") self.declare_field("word", "sequence_number") + self.sequence_number: typing.Callable[[], int] self.declare_field("word", "link_count") @@ -1240,10 +1254,11 @@ def attributes(self) -> typing.Iterator[Attribute]: offset += len(a) yield a - def attribute(self, attr_type): + def attribute(self, attr_type) -> typing.Optional[Attribute]: for a in self.attributes(): if a.type() == attr_type: return a + return None def is_directory(self) -> bool: return bool(self.flags() & MFT_RECORD_FLAGS.MFT_RECORD_IS_DIRECTORY) @@ -1281,6 +1296,8 @@ def filename_information(self) -> typing.Optional[FilenameAttribute]: def standard_information(self) -> typing.Optional[StandardInformation]: try: attr = self.attribute(ATTR_TYPE.STANDARD_INFORMATION) + if attr is None: + return None return StandardInformation(attr.value(), 0, self) except AttributeError: return None @@ -1395,7 +1412,7 @@ def record_generator(self, start_at=0) -> typing.Iterator[MFTRecord]: logging.debug("Yielding record %d", count) yield record - def mft_get_record_buf(self, number): + def mft_get_record_buf(self, number: int) -> array.array: if self.filetype == "indx": return array.array("B", "") if self.filetype == "mft": @@ -1410,8 +1427,11 @@ def mft_get_record_buf(self, number): f.seek(self.mftoffset) f.seek(number * 1024, 1) return array.array("B", f.read(1024)) + raise ValueError( + "Retrieval method not defined for self.filetype = %s." % self.filetype + ) - def mft_get_record(self, number): + def mft_get_record(self, number: int) -> MFTRecord: buf = self.mft_get_record_buf(number) if buf == array.array("B", ""): raise InvalidMFTRecordNumber(number) @@ -1426,7 +1446,11 @@ def mft_get_record(self, number): + str(r.mft_record_number()) + str(r.flags()), ) - def mft_record_build_path(self, record, cycledetector=None): + def mft_record_build_path( + self, + record: MFTRecord, + cycledetector: typing.Optional[typing.Dict[int, bool]] = None, + ) -> str: if cycledetector is None: cycledetector = {} rec_num = record.mft_record_number() & 0xFFFFFFFFFFFF @@ -1454,7 +1478,7 @@ def mft_record_build_path(self, record, cycledetector=None): cycledetector[rec_num] = True return self.mft_record_build_path(parent, cycledetector) + "\\" + fn.filename() - def mft_get_record_by_path(self, path): + def mft_get_record_by_path(self, path) -> typing.Optional[MFTRecord]: # TODO could optimize here by trying to use INDX buffers # and actually walk through the FS count = -1 @@ -1468,9 +1492,9 @@ def mft_get_record_by_path(self, path): if record_path.lower() != path.lower(): continue return record - return False + return None - def read(self, offset, length): + def read(self, offset, length) -> array.array: if self.filetype == "image": with open(self.filename, "rb") as f: f.seek(offset) diff --git a/indxparse/MFTINDX.py b/indxparse/MFTINDX.py index 0f96333..b7fa4ea 100755 --- a/indxparse/MFTINDX.py +++ b/indxparse/MFTINDX.py @@ -23,15 +23,43 @@ # # # Version v.1.2.0 +import argparse +import array import calendar - -from indxparse.MFT import * +import logging +import re +import sys +from datetime import datetime +from typing import Any, List, Optional, Union + +from indxparse.BinaryParser import OverrunBufferException +from indxparse.MFT import ( + ATTR_TYPE, + MREF, + MSEQNO, + NTATTR_STANDARD_INDEX_HEADER, + Attribute, + FilenameAttribute, + IndexRecordHeader, + IndexRootHeader, + InvalidAttributeException, + MFTRecord, + NTFSFile, + StandardInformation, + StandardInformationFieldDoesNotExist, +) verbose = False -import argparse -def information_bodyfile(path, size, inode, owner_id, info, attributes=None): +def information_bodyfile( + path: str, + size: int, + inode: int, + owner_id: int, + info: Union[FilenameAttribute, StandardInformation], + attributes: Optional[List[str]] = None, +) -> str: if not attributes: attributes = [] try: @@ -65,7 +93,9 @@ def information_bodyfile(path, size, inode, owner_id, info, attributes=None): ) -def record_bodyfile(ntfsfile, record, inode=None, attributes=None): +def record_bodyfile( + ntfsfile: NTFSFile, record: MFTRecord, attributes: Optional[List[str]] = None +) -> str: """ Return a bodyfile formatted string for the given MFT record. The string contains metadata for the one file described by the record. @@ -141,14 +171,21 @@ def record_bodyfile(ntfsfile, record, inode=None, attributes=None): return ret -def node_header_bodyfile(options, node_header, basepath): +def node_header_bodyfile( + node_header: NTATTR_STANDARD_INDEX_HEADER, + basepath: str, + *args: Any, + indxlist: bool, + slack: bool, + **kwargs: Any, +) -> str: """ Returns a bodyfile formatted string for all INDX entries following the given INDX node header. """ ret = "" attrs = ["filename", "INDX"] - if options.indxlist: + if indxlist: for e in node_header.entries(): path = basepath + "\\" + e.filename_information().filename() size = e.filename_information().logical_size() @@ -157,7 +194,7 @@ def node_header_bodyfile(options, node_header, basepath): path, size, inode, 0, e.filename_information(), attributes=attrs ) attrs.append("slack") - if options.slack: + if slack: for e in node_header.slack_entries(): path = basepath + "\\" + e.filename_information().filename() size = e.filename_information().logical_size() @@ -168,7 +205,16 @@ def node_header_bodyfile(options, node_header, basepath): return ret -def record_indx_entries_bodyfile(options, ntfsfile, record): +def record_indx_entries_bodyfile( + ntfsfile: NTFSFile, + record: MFTRecord, + *args: Any, + clustersize: int, + indxlist: bool, + offset: int, + slack: bool, + **kwargs: Any, +) -> str: """ Returns a bodyfile formatted string for all INDX entries associated with the given MFT record @@ -185,9 +231,9 @@ def record_indx_entries_bodyfile(options, ntfsfile, record): # TODO this shouldn't happen. pass else: - irh = IndexRootHeader(indxroot.value(), 0, False) - nh = irh.node_header() - ret += node_header_bodyfile(options, nh, basepath) + iroh = IndexRootHeader(indxroot.value(), 0, False) + nh = iroh.node_header() + ret += node_header_bodyfile(nh, basepath, indxlist=indxlist, slack=slack) extractbuf = array.array("B") for attr in record.attributes(): if attr.type() != ATTR_TYPE.INDEX_ALLOCATION: @@ -195,8 +241,8 @@ def record_indx_entries_bodyfile(options, ntfsfile, record): if attr.non_resident() != 0: for offset, length in attr.runlist().runs(): try: - ooff = offset * options.clustersize + options.offset - llen = length * options.clustersize + ooff = offset * clustersize + offset + llen = length * clustersize extractbuf += f.read(ooff, llen) except IOError: pass @@ -206,25 +252,25 @@ def record_indx_entries_bodyfile(options, ntfsfile, record): return ret offset = 0 try: - irh = IndexRecordHeader(extractbuf, offset, False) + ireh = IndexRecordHeader(extractbuf, offset, False) except OverrunBufferException: return ret # TODO could miss something if there is an empty, valid record at the end - while irh.magic() == 0x58444E49: - nh = irh.node_header() - ret += node_header_bodyfile(options, nh, basepath) + while ireh.magic() == 0x58444E49: + nh = ireh.node_header() + ret += node_header_bodyfile(nh, basepath, indxlist=indxlist, slack=slack) # TODO get this from the boot record - offset += options.clustersize + offset += clustersize if offset + 4096 > len(extractbuf): # TODO make this INDX record size return ret try: - irh = IndexRecordHeader(extractbuf, offset, False) + ireh = IndexRecordHeader(extractbuf, offset, False) except OverrunBufferException: return ret return ret -def try_write(s): +def try_write(s: str) -> None: try: sys.stdout.write(s) except (UnicodeEncodeError, UnicodeDecodeError): @@ -233,7 +279,15 @@ def try_write(s): ) -def print_nonresident_indx_bodyfile(options, buf, basepath=""): +def print_nonresident_indx_bodyfile( + buf: array.array[Any], + basepath: str = "", + *args: Any, + clustersize: int, + indxlist: bool, + slack: bool, + **kwargs: Any, +) -> None: offset = 0 try: irh = IndexRecordHeader(buf, offset, False) @@ -242,8 +296,8 @@ def print_nonresident_indx_bodyfile(options, buf, basepath=""): # TODO could miss something if there is an empty, valid record at the end while irh.magic() == 0x58444E49: nh = irh.node_header() - try_write(node_header_bodyfile(options, nh, basepath)) - offset += options.clustersize + try_write(node_header_bodyfile(nh, basepath, indxlist=indxlist, slack=slack)) + offset += clustersize if offset + 4096 > len(buf): # TODO make this INDX record size return try: @@ -253,38 +307,63 @@ def print_nonresident_indx_bodyfile(options, buf, basepath=""): return -def print_bodyfile(options): - if options.filetype == "mft" or options.filetype == "image": - f = NTFSFile( - clustersize=options.clustersize, - filename=options.filename, - filetype=options.filetype, - offset=options.offset, - prefix=options.prefix, - progress=options.progress, +def print_bodyfile( + *args: Any, + clustersize: int, + deleted: bool, + filename: str, + filetype: str, + filter_pattern: str, + indxlist: bool, + mftlist: bool, + offset: int, + prefix: str, + progress: bool, + slack: bool, + **kwargs: Any, +) -> None: + if filetype == "mft" or filetype == "image": + ntfs_file = NTFSFile( + clustersize=clustersize, + filename=filename, + filetype=filetype, + offset=offset, + prefix=prefix, + progress=progress, ) - if options.filter: - refilter = re.compile(options.filter) - for record in f.record_generator(): + if filter_pattern: + refilter = re.compile(filter_pattern) + for record in ntfs_file.record_generator(): logging.debug("Considering MFT record %s" % (record.mft_record_number())) try: if record.magic() != 0x454C4946: logging.debug("Record has a bad magic value") continue - if options.filter: - path = f.mft_record_build_path(record, {}) + if filter_pattern: + path = ntfs_file.mft_record_build_path(record, {}) if not refilter.search(path): logging.debug( "Skipping listing path " "due to regex filter: " + path ) continue - if record.is_active() and options.mftlist: - try_write(record_bodyfile(f, record)) - if options.indxlist or options.slack: - try_write(record_indx_entries_bodyfile(options, f, record)) - elif (not record.is_active()) and options.deleted: - try_write(record_bodyfile(f, record, attributes=["deleted"])) - if options.filetype == "image" and (options.indxlist or options.slack): + if record.is_active() and mftlist: + try_write(record_bodyfile(ntfs_file, record)) + if indxlist or slack: + try_write( + record_indx_entries_bodyfile( + ntfs_file, + record, + clustersize=clustersize, + indxlist=indxlist, + offset=offset, + slack=slack, + ) + ) + elif (not record.is_active()) and deleted: + try_write( + record_bodyfile(ntfs_file, record, attributes=["deleted"]) + ) + if filetype == "image" and (indxlist or slack): extractbuf = array.array("B") found_indxalloc = False for attr in record.attributes(): @@ -293,43 +372,61 @@ def print_bodyfile(options): found_indxalloc = True if attr.non_resident() != 0: for offset, length in attr.runlist().runs(): - ooff = offset * options.clustersize + options.offset - llen = length * options.clustersize - extractbuf += f.read(ooff, llen) + ooff = offset * clustersize + offset + llen = length * clustersize + extractbuf += ntfs_file.read(ooff, llen) else: pass # This shouldn't happen. if found_indxalloc and len(extractbuf) > 0: - path = f.mft_record_build_path(record, {}) + path = ntfs_file.mft_record_build_path(record, {}) print_nonresident_indx_bodyfile( - options, extractbuf, basepath=path + extractbuf, + basepath=path, + clustersize=clustersize, + indxlist=indxlist, + slack=slack, ) except InvalidAttributeException: pass - elif options.filetype == "indx": - with open(options.filename, "rb") as f: - buf = array.array("B", f.read()) - print_nonresident_indx_bodyfile(options, buf) + elif filetype == "indx": + with open(filename, "rb") as fh: + buf = array.array("B", fh.read()) + print_nonresident_indx_bodyfile( + buf, clustersize=clustersize, indxlist=indxlist, slack=slack + ) -def print_indx_info(options): +def print_indx_info( + *args: Any, + clustersize: int, + extract: str, + filename: str, + filetype: str, + infomode: str, + offset: int, + prefix: str, + progress: bool, + **kwargs: Any, +) -> None: f = NTFSFile( - clustersize=options.clustersize, - filename=options.filename, - filetype=options.filetype, - offset=options.offset, - prefix=options.prefix, - progress=options.progress, + clustersize=clustersize, + filename=filename, + filetype=filetype, + offset=offset, + prefix=prefix, + progress=progress, ) + record: Optional[MFTRecord] = None try: - record_num = int(options.infomode) + record_num = int(infomode) record_buf = f.mft_get_record_buf(record_num) record = MFTRecord(record_buf, 0, False) except ValueError: - record = f.mft_get_record_by_path(options.infomode) - if not record: - print("Did not find directory entry for " + options.infomode) + record = f.mft_get_record_by_path(infomode) + if record is None: + print("Did not find directory entry for " + infomode) return - print("Found directory entry for: " + options.infomode) + print("Found directory entry for: " + infomode) if record.magic() != 0x454C4946: if record.magic() == int("0xBAAD", 0x10): @@ -357,9 +454,11 @@ def print_indx_info(options): if data_attr and data_attr.non_resident() > 0: print(" size: %d bytes" % (data_attr.data_size())) else: - print(" size: %d bytes" % (record.filename_information().logical_size())) + rfni = record.filename_information() + if rfni is not None: + print(" size: %d bytes" % (rfni.logical_size())) - def get_flags(flags): + def get_flags(flags: int) -> List[str]: attributes = [] if flags & 0x01: attributes.append("readonly") @@ -397,57 +496,56 @@ def get_flags(flags): attributes.append("has-view-index") return attributes - print( - " attributes: " - + ", ".join(get_flags(record.standard_information().attributes())) - ) + rsi = record.standard_information() + if rsi is None: + print(" SI not found") + else: + print(" attributes: " + ", ".join(get_flags(rsi.attributes()))) - crtime = record.standard_information().created_time().isoformat("T") + "Z" - mtime = record.standard_information().modified_time().isoformat("T") + "Z" - chtime = record.standard_information().changed_time().isoformat("T") + "Z" - atime = record.standard_information().accessed_time().isoformat("T") + "Z" + crtime = rsi.created_time().isoformat("T") + "Z" + mtime = rsi.modified_time().isoformat("T") + "Z" + chtime = rsi.changed_time().isoformat("T") + "Z" + atime = rsi.accessed_time().isoformat("T") + "Z" - print(" SI modified: %s" % (mtime)) - print(" SI accessed: %s" % (atime)) - print(" SI changed: %s" % (chtime)) - print(" SI birthed: %s" % (crtime)) + print(" SI modified: %s" % (mtime)) + print(" SI accessed: %s" % (atime)) + print(" SI changed: %s" % (chtime)) + print(" SI birthed: %s" % (crtime)) - try: - # since the fields are sequential, we can handle an exception half way through here - # and then ignore the remaining items. Dont have to worry about individual try/catches - print( - " owner id (quota info): %d" % (record.standard_information().owner_id()) - ) - print(" security id: %d" % (record.standard_information().security_id())) - print(" quota charged: %d" % (record.standard_information().quota_charged())) - print(" USN: %d" % (record.standard_information().usn())) - except StandardInformationFieldDoesNotExist: - pass + try: + # since the fields are sequential, we can handle an exception half way through here + # and then ignore the remaining items. Dont have to worry about individual try/catches + print(" owner id (quota info): %d" % (rsi.owner_id())) + print(" security id: %d" % (rsi.security_id())) + print(" quota charged: %d" % (rsi.quota_charged())) + print(" USN: %d" % (rsi.usn())) + except StandardInformationFieldDoesNotExist: + pass print("Filenames:") for b in record.attributes(): if b.type() != ATTR_TYPE.FILENAME_INFORMATION: continue try: - attr = FilenameAttribute(b.value(), 0, record) - a = attr.filename_type() + fnattr = FilenameAttribute(b.value(), 0, record) + a = fnattr.filename_type() print(" Type: %s" % (["POSIX", "WIN32", "DOS 8.3", "WIN32 + DOS 8.3"][a])) - print(" name: %s" % (str(attr.filename()))) - print(" attributes: " + ", ".join(get_flags(attr.flags()))) - print(" logical size: %d bytes" % (attr.logical_size())) - print(" physical size: %d bytes" % (attr.physical_size())) + print(" name: %s" % (str(fnattr.filename()))) + print(" attributes: " + ", ".join(get_flags(fnattr.flags()))) + print(" logical size: %d bytes" % (fnattr.logical_size())) + print(" physical size: %d bytes" % (fnattr.physical_size())) - crtime = attr.created_time().isoformat("T") + "Z" - mtime = attr.modified_time().isoformat("T") + "Z" - chtime = attr.changed_time().isoformat("T") + "Z" - atime = attr.accessed_time().isoformat("T") + "Z" + crtime = fnattr.created_time().isoformat("T") + "Z" + mtime = fnattr.modified_time().isoformat("T") + "Z" + chtime = fnattr.changed_time().isoformat("T") + "Z" + atime = fnattr.accessed_time().isoformat("T") + "Z" print(" modified: %s" % (mtime)) print(" accessed: %s" % (atime)) print(" changed: %s" % (chtime)) print(" birthed: %s" % (crtime)) - print(" parent ref: %d" % (MREF(attr.mft_parent_reference()))) - print(" parent seq: %d" % (MSEQNO(attr.mft_parent_reference()))) + print(" parent ref: %d" % (MREF(fnattr.mft_parent_reference()))) + print(" parent seq: %d" % (MSEQNO(fnattr.mft_parent_reference()))) except ZeroDivisionError: continue @@ -455,7 +553,7 @@ def get_flags(flags): for b in record.attributes(): print(" %s" % (Attribute.TYPES[b.type()])) print(" attribute name: %s" % (b.name() or "")) - print(" attribute flags: " + ", ".join(get_flags(attr.flags()))) + print(" attribute flags: " + ", ".join(get_flags(b.flags()))) if b.non_resident() > 0: print(" resident: no") print(" data size: %d" % (b.data_size())) @@ -468,10 +566,10 @@ def get_flags(flags): print( " %s (%s) bytes for %s (%s) bytes" % ( - offset * options.clustersize, - hex(offset * options.clustersize), - length * options.clustersize, - hex(length * options.clustersize), + offset * clustersize, + hex(offset * clustersize), + length * clustersize, + hex(length * clustersize), ) ) else: @@ -487,31 +585,41 @@ def get_flags(flags): if indxroot.non_resident() != 0: # This shouldn't happen. print("INDX_ROOT attribute is non-resident") - for e in indxroot.runlist().entries(): - print("Cluster %s, length %s" % (hex(e.offset()), hex(e.length()))) + for rle in indxroot.runlist()._entries(): + print("Cluster %s, length %s" % (hex(rle.offset()), hex(rle.length()))) else: print("INDX_ROOT attribute is resident") irh = IndexRootHeader(indxroot.value(), 0, False) someentries = False - for e in irh.node_header().entries(): + for nhe in irh.node_header().entries(): if not someentries: print("INDX_ROOT entries:") someentries = True - print(" " + e.filename_information().filename()) + print(" " + nhe.filename_information().filename()) print( - " " + str(e.filename_information().logical_size()) + " bytes in size" + " " + + str(nhe.filename_information().logical_size()) + + " bytes in size" ) print( - " b " + e.filename_information().created_time().isoformat("T") + "Z" + " b " + + nhe.filename_information().created_time().isoformat("T") + + "Z" ) print( - " m " + e.filename_information().modified_time().isoformat("T") + "Z" + " m " + + nhe.filename_information().modified_time().isoformat("T") + + "Z" ) print( - " c " + e.filename_information().changed_time().isoformat("T") + "Z" + " c " + + nhe.filename_information().changed_time().isoformat("T") + + "Z" ) print( - " a " + e.filename_information().accessed_time().isoformat("T") + "Z" + " a " + + nhe.filename_information().accessed_time().isoformat("T") + + "Z" ) if not someentries: @@ -526,30 +634,30 @@ def get_flags(flags): print("INDX_ROOT slack entries: (none)") extractbuf = array.array("B") found_indxalloc = False - for attr in record.attributes(): - if attr.type() != ATTR_TYPE.INDEX_ALLOCATION: + for rattr in record.attributes(): + if rattr.type() != ATTR_TYPE.INDEX_ALLOCATION: continue found_indxalloc = True print("Found INDX_ALLOCATION attribute") - if attr.non_resident() != 0: + if rattr.non_resident() != 0: print("INDX_ALLOCATION is non-resident") - for offset, length in attr.runlist().runs(): + for offset, length in rattr.runlist().runs(): print("Cluster %s, length %s" % (hex(offset), hex(length))) print( " Using clustersize %s (%s) bytes and volume offset %s (%s) bytes: \n %s (%s) bytes for %s (%s) bytes" % ( - options.clustersize, - hex(options.clustersize), - options.offset, - hex(options.offset), - (offset * options.clustersize) + options.offset, - hex((offset * options.clustersize) + options.offset), - length * options.clustersize, - hex(length * options.clustersize), + clustersize, + hex(clustersize), + offset, + hex(offset), + (offset * clustersize) + offset, + hex((offset * clustersize) + offset), + length * clustersize, + hex(length * clustersize), ) ) - ooff = offset * options.clustersize + options.offset - llen = length * options.clustersize + ooff = offset * clustersize + offset + llen = length * clustersize extractbuf += f.read(ooff, llen) else: # This shouldn't happen. @@ -557,13 +665,13 @@ def get_flags(flags): if not found_indxalloc: print("No INDX_ALLOCATION attribute found") return - if options.extract: - with open(options.extract, "wb") as g: + if extract: + with open(extract, "wb") as g: g.write(extractbuf) return -def main(): +def main() -> None: parser = argparse.ArgumentParser(description="Parse NTFS " "filesystem structures.") parser.add_argument( "-t", @@ -638,7 +746,7 @@ def main(): action="store", metavar="regex", nargs=1, - dest="filter", + dest="filter_pattern", help="Only consider entries whose path " "matches this regular expression", ) parser.add_argument( @@ -779,19 +887,40 @@ def main(): ): logging.error("You must choose a mode (-i/-l/-s/-m/-d)") - if results.filter: - results.filter = results.filter[0] + if results.filter_pattern: + results.filter_pattern = results.filter_pattern[0] logging.info( "Asked to only list file entry information " - "for paths matching the regular expression: " + results.filter + "for paths matching the regular expression: " + results.filter_pattern ) if results.infomode: logging.warning("This filter has no meaning with information mode (-i)") if results.infomode: - print_indx_info(results) + print_indx_info( + clustersize=results.clustersize, + extract=results.extract, + filename=results.filename, + filetype=results.filetype, + infomode=results.infomode, + offset=results.offset, + prefix=results.prefix, + progress=results.progress, + ) elif results.indxlist or results.slack or results.mftlist or results.deleted: - print_bodyfile(results) + print_bodyfile( + clustersize=results.clustersize, + deleted=results.deleted, + filename=results.filename, + filetype=results.filetype, + filter_pattern=results.filter_pattern, + indxlist=results.indxlist, + mftlist=results.mftlist, + offset=results.offset, + prefix=results.prefix, + progress=results.progress, + slack=results.slack, + ) if __name__ == "__main__": diff --git a/indxparse/MFTView.py b/indxparse/MFTView.py index f70acd7..7fbe251 100755 --- a/indxparse/MFTView.py +++ b/indxparse/MFTView.py @@ -191,7 +191,7 @@ class RecordConflict(Exception): def __init__(self, count): self.value = count - def add_node(mftfile, record): + def add_node(mftfile: NTFSFile, record: MFTRecord) -> None: """ Add the given record to the internal list of nodes, adding the parent nodes as appropriate.