From b5b05a85e43e96b6487142ea594baa4abb30d7da Mon Sep 17 00:00:00 2001 From: Alex Nelson Date: Mon, 14 Aug 2023 16:06:58 -0400 Subject: [PATCH 01/12] Use explicit imports The line `from indxparse.MFT import *` obscured symbol definitions from `mypy`, and is also flagged as a code style issue by `flake8`. This patch makes the symbol imports explicit. In addition to the unit tests of `make check`, this patch is written to pass `flake8` review when tested with the following: ```bash flake8 --ignore=E501,W503 indxparse/MFTINDX.py ``` (`flake8` is intentionally not proposed for addition to CI yet due to needing to address other issues.) Disclaimer: Participation by NIST in the creation of the documentation of mentioned software is not intended to imply a recommendation or endorsement by the National Institute of Standards and Technology, nor is it intended to imply that any specific software is necessarily the best available for the purpose. Signed-off-by: Alex Nelson --- indxparse/MFTINDX.py | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/indxparse/MFTINDX.py b/indxparse/MFTINDX.py index 0f96333..378bc3d 100755 --- a/indxparse/MFTINDX.py +++ b/indxparse/MFTINDX.py @@ -23,12 +23,32 @@ # # # Version v.1.2.0 +import argparse +import array import calendar - -from indxparse.MFT import * +import logging +import re +import sys +from datetime import datetime + +from indxparse.BinaryParser import OverrunBufferException +from indxparse.MFT import ( + ATTR_TYPE, + MREF, + MSEQNO, + NTATTR_STANDARD_INDEX_HEADER, + Attribute, + FilenameAttribute, + IndexRecordHeader, + IndexRootHeader, + InvalidAttributeException, + MFTRecord, + NTFSFile, + StandardInformation, + StandardInformationFieldDoesNotExist, +) verbose = False -import argparse def information_bodyfile(path, size, inode, owner_id, info, attributes=None): From f45b3919ccbbb7ac15264f2f4100af8d7e418809 Mon Sep 17 00:00:00 2001 From: Alex Nelson Date: Mon, 14 Aug 2023 16:43:13 -0400 Subject: [PATCH 02/12] Fix variable reference This was found while reviewing variable name re-use. Signed-off-by: Alex Nelson --- indxparse/MFTINDX.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indxparse/MFTINDX.py b/indxparse/MFTINDX.py index 378bc3d..140e898 100755 --- a/indxparse/MFTINDX.py +++ b/indxparse/MFTINDX.py @@ -475,7 +475,7 @@ def get_flags(flags): for b in record.attributes(): print(" %s" % (Attribute.TYPES[b.type()])) print(" attribute name: %s" % (b.name() or "")) - print(" attribute flags: " + ", ".join(get_flags(attr.flags()))) + print(" attribute flags: " + ", ".join(get_flags(b.flags()))) if b.non_resident() > 0: print(" resident: no") print(" data size: %d" % (b.data_size())) From 0aee6b4bdd8620c50a5fc1f44e4efd8f34951f96 Mon Sep 17 00:00:00 2001 From: Alex Nelson Date: Mon, 14 Aug 2023 16:22:44 -0400 Subject: [PATCH 03/12] Avoid variable name recycling `mypy` assigns a type to a symbol on its first encounter within a function body. When the symbol is reused later, even if for a whole new purpose, the original name assignment is preserved. If the type is incongruous, a type error is raised. Reviewing `MFTINDX.py` with `mypy --strict` showed several instances of variable re-use (including in `for` loops), with reassignment to different types. This patch renames symbols to avoid ambiguity. Disclaimer: Participation by NIST in the creation of the documentation of mentioned software is not intended to imply a recommendation or endorsement by the National Institute of Standards and Technology, nor is it intended to imply that any specific software is necessarily the best available for the purpose. Signed-off-by: Alex Nelson --- indxparse/MFTINDX.py | 94 +++++++++++++++++++++++++------------------- 1 file changed, 53 insertions(+), 41 deletions(-) diff --git a/indxparse/MFTINDX.py b/indxparse/MFTINDX.py index 140e898..e12a917 100755 --- a/indxparse/MFTINDX.py +++ b/indxparse/MFTINDX.py @@ -205,8 +205,8 @@ def record_indx_entries_bodyfile(options, ntfsfile, record): # TODO this shouldn't happen. pass else: - irh = IndexRootHeader(indxroot.value(), 0, False) - nh = irh.node_header() + iroh = IndexRootHeader(indxroot.value(), 0, False) + nh = iroh.node_header() ret += node_header_bodyfile(options, nh, basepath) extractbuf = array.array("B") for attr in record.attributes(): @@ -226,19 +226,19 @@ def record_indx_entries_bodyfile(options, ntfsfile, record): return ret offset = 0 try: - irh = IndexRecordHeader(extractbuf, offset, False) + ireh = IndexRecordHeader(extractbuf, offset, False) except OverrunBufferException: return ret # TODO could miss something if there is an empty, valid record at the end - while irh.magic() == 0x58444E49: - nh = irh.node_header() + while ireh.magic() == 0x58444E49: + nh = ireh.node_header() ret += node_header_bodyfile(options, nh, basepath) # TODO get this from the boot record offset += options.clustersize if offset + 4096 > len(extractbuf): # TODO make this INDX record size return ret try: - irh = IndexRecordHeader(extractbuf, offset, False) + ireh = IndexRecordHeader(extractbuf, offset, False) except OverrunBufferException: return ret return ret @@ -275,7 +275,7 @@ def print_nonresident_indx_bodyfile(options, buf, basepath=""): def print_bodyfile(options): if options.filetype == "mft" or options.filetype == "image": - f = NTFSFile( + ntfs_file = NTFSFile( clustersize=options.clustersize, filename=options.filename, filetype=options.filetype, @@ -285,25 +285,27 @@ def print_bodyfile(options): ) if options.filter: refilter = re.compile(options.filter) - for record in f.record_generator(): + for record in ntfs_file.record_generator(): logging.debug("Considering MFT record %s" % (record.mft_record_number())) try: if record.magic() != 0x454C4946: logging.debug("Record has a bad magic value") continue if options.filter: - path = f.mft_record_build_path(record, {}) + path = ntfs_file.mft_record_build_path(record, {}) if not refilter.search(path): logging.debug( "Skipping listing path " "due to regex filter: " + path ) continue if record.is_active() and options.mftlist: - try_write(record_bodyfile(f, record)) + try_write(record_bodyfile(ntfs_file, record)) if options.indxlist or options.slack: - try_write(record_indx_entries_bodyfile(options, f, record)) + try_write(record_indx_entries_bodyfile(options, ntfs_file, record)) elif (not record.is_active()) and options.deleted: - try_write(record_bodyfile(f, record, attributes=["deleted"])) + try_write( + record_bodyfile(ntfs_file, record, attributes=["deleted"]) + ) if options.filetype == "image" and (options.indxlist or options.slack): extractbuf = array.array("B") found_indxalloc = False @@ -315,19 +317,19 @@ def print_bodyfile(options): for offset, length in attr.runlist().runs(): ooff = offset * options.clustersize + options.offset llen = length * options.clustersize - extractbuf += f.read(ooff, llen) + extractbuf += ntfs_file.read(ooff, llen) else: pass # This shouldn't happen. if found_indxalloc and len(extractbuf) > 0: - path = f.mft_record_build_path(record, {}) + path = ntfs_file.mft_record_build_path(record, {}) print_nonresident_indx_bodyfile( options, extractbuf, basepath=path ) except InvalidAttributeException: pass elif options.filetype == "indx": - with open(options.filename, "rb") as f: - buf = array.array("B", f.read()) + with open(options.filename, "rb") as fh: + buf = array.array("B", fh.read()) print_nonresident_indx_bodyfile(options, buf) @@ -449,25 +451,25 @@ def get_flags(flags): if b.type() != ATTR_TYPE.FILENAME_INFORMATION: continue try: - attr = FilenameAttribute(b.value(), 0, record) - a = attr.filename_type() + fnattr = FilenameAttribute(b.value(), 0, record) + a = fnattr.filename_type() print(" Type: %s" % (["POSIX", "WIN32", "DOS 8.3", "WIN32 + DOS 8.3"][a])) - print(" name: %s" % (str(attr.filename()))) - print(" attributes: " + ", ".join(get_flags(attr.flags()))) - print(" logical size: %d bytes" % (attr.logical_size())) - print(" physical size: %d bytes" % (attr.physical_size())) + print(" name: %s" % (str(fnattr.filename()))) + print(" attributes: " + ", ".join(get_flags(fnattr.flags()))) + print(" logical size: %d bytes" % (fnattr.logical_size())) + print(" physical size: %d bytes" % (fnattr.physical_size())) - crtime = attr.created_time().isoformat("T") + "Z" - mtime = attr.modified_time().isoformat("T") + "Z" - chtime = attr.changed_time().isoformat("T") + "Z" - atime = attr.accessed_time().isoformat("T") + "Z" + crtime = fnattr.created_time().isoformat("T") + "Z" + mtime = fnattr.modified_time().isoformat("T") + "Z" + chtime = fnattr.changed_time().isoformat("T") + "Z" + atime = fnattr.accessed_time().isoformat("T") + "Z" print(" modified: %s" % (mtime)) print(" accessed: %s" % (atime)) print(" changed: %s" % (chtime)) print(" birthed: %s" % (crtime)) - print(" parent ref: %d" % (MREF(attr.mft_parent_reference()))) - print(" parent seq: %d" % (MSEQNO(attr.mft_parent_reference()))) + print(" parent ref: %d" % (MREF(fnattr.mft_parent_reference()))) + print(" parent seq: %d" % (MSEQNO(fnattr.mft_parent_reference()))) except ZeroDivisionError: continue @@ -507,31 +509,41 @@ def get_flags(flags): if indxroot.non_resident() != 0: # This shouldn't happen. print("INDX_ROOT attribute is non-resident") - for e in indxroot.runlist().entries(): - print("Cluster %s, length %s" % (hex(e.offset()), hex(e.length()))) + for rle in indxroot.runlist().entries(): + print("Cluster %s, length %s" % (hex(rle.offset()), hex(rle.length()))) else: print("INDX_ROOT attribute is resident") irh = IndexRootHeader(indxroot.value(), 0, False) someentries = False - for e in irh.node_header().entries(): + for nhe in irh.node_header().entries(): if not someentries: print("INDX_ROOT entries:") someentries = True - print(" " + e.filename_information().filename()) + print(" " + nhe.filename_information().filename()) print( - " " + str(e.filename_information().logical_size()) + " bytes in size" + " " + + str(nhe.filename_information().logical_size()) + + " bytes in size" ) print( - " b " + e.filename_information().created_time().isoformat("T") + "Z" + " b " + + nhe.filename_information().created_time().isoformat("T") + + "Z" ) print( - " m " + e.filename_information().modified_time().isoformat("T") + "Z" + " m " + + nhe.filename_information().modified_time().isoformat("T") + + "Z" ) print( - " c " + e.filename_information().changed_time().isoformat("T") + "Z" + " c " + + nhe.filename_information().changed_time().isoformat("T") + + "Z" ) print( - " a " + e.filename_information().accessed_time().isoformat("T") + "Z" + " a " + + nhe.filename_information().accessed_time().isoformat("T") + + "Z" ) if not someentries: @@ -546,14 +558,14 @@ def get_flags(flags): print("INDX_ROOT slack entries: (none)") extractbuf = array.array("B") found_indxalloc = False - for attr in record.attributes(): - if attr.type() != ATTR_TYPE.INDEX_ALLOCATION: + for rattr in record.attributes(): + if rattr.type() != ATTR_TYPE.INDEX_ALLOCATION: continue found_indxalloc = True print("Found INDX_ALLOCATION attribute") - if attr.non_resident() != 0: + if rattr.non_resident() != 0: print("INDX_ALLOCATION is non-resident") - for offset, length in attr.runlist().runs(): + for offset, length in rattr.runlist().runs(): print("Cluster %s, length %s" % (hex(offset), hex(length))) print( " Using clustersize %s (%s) bytes and volume offset %s (%s) bytes: \n %s (%s) bytes for %s (%s) bytes" From 7a571190663b8979d5f4d7b46cd297b9cbd331ee Mon Sep 17 00:00:00 2001 From: Alex Nelson Date: Mon, 14 Aug 2023 17:07:21 -0400 Subject: [PATCH 04/12] Add logic to handle Optional returned value Since `MFTRecord.standard_information()` could fail to find the SI Attribute, a block of logic that assumed the SI would be found is revised in this patch to handle the not-found case. Signed-off-by: Alex Nelson --- indxparse/MFTINDX.py | 45 ++++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/indxparse/MFTINDX.py b/indxparse/MFTINDX.py index e12a917..9eaba0f 100755 --- a/indxparse/MFTINDX.py +++ b/indxparse/MFTINDX.py @@ -419,32 +419,31 @@ def get_flags(flags): attributes.append("has-view-index") return attributes - print( - " attributes: " - + ", ".join(get_flags(record.standard_information().attributes())) - ) + rsi = record.standard_information() + if rsi is None: + print(" SI not found") + else: + print(" attributes: " + ", ".join(get_flags(rsi.attributes()))) - crtime = record.standard_information().created_time().isoformat("T") + "Z" - mtime = record.standard_information().modified_time().isoformat("T") + "Z" - chtime = record.standard_information().changed_time().isoformat("T") + "Z" - atime = record.standard_information().accessed_time().isoformat("T") + "Z" + crtime = rsi.created_time().isoformat("T") + "Z" + mtime = rsi.modified_time().isoformat("T") + "Z" + chtime = rsi.changed_time().isoformat("T") + "Z" + atime = rsi.accessed_time().isoformat("T") + "Z" - print(" SI modified: %s" % (mtime)) - print(" SI accessed: %s" % (atime)) - print(" SI changed: %s" % (chtime)) - print(" SI birthed: %s" % (crtime)) + print(" SI modified: %s" % (mtime)) + print(" SI accessed: %s" % (atime)) + print(" SI changed: %s" % (chtime)) + print(" SI birthed: %s" % (crtime)) - try: - # since the fields are sequential, we can handle an exception half way through here - # and then ignore the remaining items. Dont have to worry about individual try/catches - print( - " owner id (quota info): %d" % (record.standard_information().owner_id()) - ) - print(" security id: %d" % (record.standard_information().security_id())) - print(" quota charged: %d" % (record.standard_information().quota_charged())) - print(" USN: %d" % (record.standard_information().usn())) - except StandardInformationFieldDoesNotExist: - pass + try: + # since the fields are sequential, we can handle an exception half way through here + # and then ignore the remaining items. Dont have to worry about individual try/catches + print(" owner id (quota info): %d" % (rsi.owner_id())) + print(" security id: %d" % (rsi.security_id())) + print(" quota charged: %d" % (rsi.quota_charged())) + print(" USN: %d" % (rsi.usn())) + except StandardInformationFieldDoesNotExist: + pass print("Filenames:") for b in record.attributes(): From 134e9e06bbb1eb926a0ef70692d0fb2c26cf2f33 Mon Sep 17 00:00:00 2001 From: Alex Nelson Date: Mon, 14 Aug 2023 17:15:44 -0400 Subject: [PATCH 05/12] Unify type around if-else branching as Optional[MFTRecord] Signed-off-by: Alex Nelson --- indxparse/MFTINDX.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/indxparse/MFTINDX.py b/indxparse/MFTINDX.py index 9eaba0f..447ca2a 100755 --- a/indxparse/MFTINDX.py +++ b/indxparse/MFTINDX.py @@ -30,6 +30,7 @@ import re import sys from datetime import datetime +from typing import Optional from indxparse.BinaryParser import OverrunBufferException from indxparse.MFT import ( @@ -342,13 +343,14 @@ def print_indx_info(options): prefix=options.prefix, progress=options.progress, ) + record: Optional[MFTRecord] = None try: record_num = int(options.infomode) record_buf = f.mft_get_record_buf(record_num) record = MFTRecord(record_buf, 0, False) except ValueError: record = f.mft_get_record_by_path(options.infomode) - if not record: + if record is None: print("Did not find directory entry for " + options.infomode) return print("Found directory entry for: " + options.infomode) From 2082e1fefa78c89e5fb55564957bc764558d266d Mon Sep 17 00:00:00 2001 From: Alex Nelson Date: Mon, 14 Aug 2023 17:31:43 -0400 Subject: [PATCH 06/12] Add logic to handle Optional returned value Since `MFTRecord.filename_information()` could fail to find a FN Attribute, a line of output that assumed the FN would be found is revised in this patch to handle the not-found case. Signed-off-by: Alex Nelson --- indxparse/MFTINDX.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/indxparse/MFTINDX.py b/indxparse/MFTINDX.py index 447ca2a..d6213b6 100755 --- a/indxparse/MFTINDX.py +++ b/indxparse/MFTINDX.py @@ -381,7 +381,9 @@ def print_indx_info(options): if data_attr and data_attr.non_resident() > 0: print(" size: %d bytes" % (data_attr.data_size())) else: - print(" size: %d bytes" % (record.filename_information().logical_size())) + rfni = record.filename_information() + if rfni is not None: + print(" size: %d bytes" % (rfni.logical_size())) def get_flags(flags): attributes = [] From 7cdf1f4071606557036377630292d834b1391c4b Mon Sep 17 00:00:00 2001 From: Alex Nelson Date: Mon, 14 Aug 2023 17:35:19 -0400 Subject: [PATCH 07/12] Fix function name Type-review flagged that no method `Runlist.entries` exists. Signed-off-by: Alex Nelson --- indxparse/MFTINDX.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indxparse/MFTINDX.py b/indxparse/MFTINDX.py index d6213b6..0304883 100755 --- a/indxparse/MFTINDX.py +++ b/indxparse/MFTINDX.py @@ -512,7 +512,7 @@ def get_flags(flags): if indxroot.non_resident() != 0: # This shouldn't happen. print("INDX_ROOT attribute is non-resident") - for rle in indxroot.runlist().entries(): + for rle in indxroot.runlist()._entries(): print("Cluster %s, length %s" % (hex(rle.offset()), hex(rle.length()))) else: print("INDX_ROOT attribute is resident") From e494b1640de02eae503ba4968ad8bac62fa0e5bd Mon Sep 17 00:00:00 2001 From: Alex Nelson Date: Mon, 14 Aug 2023 18:03:48 -0400 Subject: [PATCH 08/12] Type-annotate mft_get_record_buf and callers One of the methods flagged as needing type annotations by `mypy --strict indxparse/MFTINDX.py` is `NTFSFile.mft_get_record_buf`. This method is written in a way that, for one unfamiliar with calling context, could return a potentially null result. Review of the code paths to this function show that, at present, an `array.array` will be returned. But, `MFT.py` would require significant rearchitecting (likely, subclassing `NTFSFile`) to guarantee that. Rather than return `Optional[array]`, this patch adds a `raise ValueError` in accordance with one usage in `MFTINDX.py`. Handling `None` looked possible, but more complex, than a blunt fail, due to its usage in the recursive path-constructing function `mft_record_build_path`. Some of the other type additions in this patch are side effects from `mypy` flagging needs, and some are side effects from reviewing the call paths to `mft_get_record_buf`. Disclaimer: Participation by NIST in the creation of the documentation of mentioned software is not intended to imply a recommendation or endorsement by the National Institute of Standards and Technology, nor is it intended to imply that any specific software is necessarily the best available for the purpose. Signed-off-by: Alex Nelson --- indxparse/MFT.py | 15 ++++++++++++--- indxparse/MFTView.py | 2 +- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/indxparse/MFT.py b/indxparse/MFT.py index e65047d..4ad073f 100755 --- a/indxparse/MFT.py +++ b/indxparse/MFT.py @@ -797,6 +797,7 @@ def __init__( super(FilenameAttribute, self).__init__(buf, offset) self.declare_field("qword", "mft_parent_reference", 0x0) + self.mft_parent_reference: typing.Callable[[], int] self.declare_field("filetime", "created_time") self.created_time: typing.Callable[[], datetime] @@ -1200,6 +1201,7 @@ def __init__( self.declare_field("qword", "lsn") self.declare_field("word", "sequence_number") + self.sequence_number: typing.Callable[[], int] self.declare_field("word", "link_count") @@ -1395,7 +1397,7 @@ def record_generator(self, start_at=0) -> typing.Iterator[MFTRecord]: logging.debug("Yielding record %d", count) yield record - def mft_get_record_buf(self, number): + def mft_get_record_buf(self, number: int) -> array.array: if self.filetype == "indx": return array.array("B", "") if self.filetype == "mft": @@ -1410,8 +1412,11 @@ def mft_get_record_buf(self, number): f.seek(self.mftoffset) f.seek(number * 1024, 1) return array.array("B", f.read(1024)) + raise ValueError( + "Retrieval method not defined for self.filetype = %s." % self.filetype + ) - def mft_get_record(self, number): + def mft_get_record(self, number: int) -> MFTRecord: buf = self.mft_get_record_buf(number) if buf == array.array("B", ""): raise InvalidMFTRecordNumber(number) @@ -1426,7 +1431,11 @@ def mft_get_record(self, number): + str(r.mft_record_number()) + str(r.flags()), ) - def mft_record_build_path(self, record, cycledetector=None): + def mft_record_build_path( + self, + record: MFTRecord, + cycledetector: typing.Optional[typing.Dict[int, bool]] = None, + ) -> str: if cycledetector is None: cycledetector = {} rec_num = record.mft_record_number() & 0xFFFFFFFFFFFF diff --git a/indxparse/MFTView.py b/indxparse/MFTView.py index f70acd7..7fbe251 100755 --- a/indxparse/MFTView.py +++ b/indxparse/MFTView.py @@ -191,7 +191,7 @@ class RecordConflict(Exception): def __init__(self, count): self.value = count - def add_node(mftfile, record): + def add_node(mftfile: NTFSFile, record: MFTRecord) -> None: """ Add the given record to the internal list of nodes, adding the parent nodes as appropriate. From 7372c50b99e2805f3eb1f908801da2720b2d94f1 Mon Sep 17 00:00:00 2001 From: Alex Nelson Date: Mon, 14 Aug 2023 16:35:22 -0400 Subject: [PATCH 09/12] Add type annotations for methods currently called by MFTINDX.py These signatures were added to reduce type-issues reported by `mypy --strict indxparse/MFTINDX.py`. `MFT.py` typically only needed return types on methods to satisfy this objective. As a progress note: some hundreds of issues remain if `mypy --strict` is run against `MFT.py`. One less-obvious signature revision happened: `mft_get_record_by_path` now returns `Optional[MFTRecord]` instead of `Union[MFTRecord, bool]`, to remain consistent with the control flow around its sole caller, in `MFTINDX.py`. Disclaimer: Participation by NIST in the creation of the documentation of mentioned software is not intended to imply a recommendation or endorsement by the National Institute of Standards and Technology, nor is it intended to imply that any specific software is necessarily the best available for the purpose. A follow-on patch will add further signatures in `MFTINDX.py`. Signed-off-by: Alex Nelson --- indxparse/MFT.py | 53 ++++++++++++++++++++++++++++---------------- indxparse/MFTINDX.py | 6 ++--- 2 files changed, 37 insertions(+), 22 deletions(-) diff --git a/indxparse/MFT.py b/indxparse/MFT.py index 4ad073f..0fcb060 100755 --- a/indxparse/MFT.py +++ b/indxparse/MFT.py @@ -501,6 +501,7 @@ def __init__( self.entry_list_start: typing.Callable[[], int] self.declare_field("dword", "entry_list_end") + self.entry_list_end: typing.Callable[[], int] self.declare_field("dword", "entry_list_allocation_end") self.entry_list_allocation_end: typing.Callable[[], int] @@ -514,7 +515,7 @@ def __init__( self.entry_list_allocation_end() - self.entry_list_start(), ) - def entries(self): + def entries(self) -> typing.Iterator[IndexEntry]: """ A generator that returns each INDX entry associated with this node. """ @@ -528,7 +529,7 @@ def entries(self): offset += e.length() yield e - def slack_entries(self): + def slack_entries(self) -> typing.Iterator[SlackIndexEntry]: """ A generator that yields INDX entries found in the slack space associated with this header. @@ -569,7 +570,7 @@ def __init__( self.declare_field("byte", "unused3") self._node_header_offset = self.current_field_offset() - def node_header(self): + def node_header(self) -> NTATTR_STANDARD_INDEX_HEADER: return NTATTR_STANDARD_INDEX_HEADER( self._buf, self.offset() + self._node_header_offset, self ) @@ -586,6 +587,7 @@ def __init__( super(IndexRecordHeader, self).__init__(buf, offset, parent) self.declare_field("dword", "magic", 0x0) + self.magic: typing.Callable[[], int] self.declare_field("word", "usa_offset") self.usa_offset: typing.Callable[[], int] @@ -600,7 +602,7 @@ def __init__( self._node_header_offset = self.current_field_offset() self.fixup(self.usa_count(), self.usa_offset()) - def node_header(self): + def node_header(self) -> NTATTR_STANDARD_INDEX_HEADER: return NTATTR_STANDARD_INDEX_HEADER( self._buf, self.offset() + self._node_header_offset, self ) @@ -669,6 +671,7 @@ def __init__( self.declare_field("qword", "mft_reference", 0x0) self.declare_field("word", "length") + self.length: typing.Callable[[], int] self.declare_field("word", "filename_information_length") self.filename_information_length: typing.Callable[[], int] @@ -681,12 +684,13 @@ def __init__( self.current_field_offset(), self.filename_information_length(), ) + self._off_filename_information_buffer: int self.declare_field( "qword", "child_vcn", align(self.current_field_offset(), 0x8) ) - def filename_information(self): + def filename_information(self) -> FilenameAttribute: return FilenameAttribute( self._buf, self.offset() + self._off_filename_information_buffer, self ) @@ -724,6 +728,7 @@ def __init__( self.accessed_time: typing.Callable[[], datetime] self.declare_field("dword", "attributes") + self.attributes: typing.Callable[[], int] self.declare_field("binary", "reserved", self.current_field_offset(), 0xC) @@ -741,7 +746,7 @@ def __init__( # def __len__(self): # return 0x42 + (self.filename_length() * 2) - def owner_id(self): + def owner_id(self) -> int: """ This is an explicit method because it may not exist in OSes under Win2k @@ -763,7 +768,7 @@ def security_id(self) -> int: except OverrunBufferException: raise StandardInformationFieldDoesNotExist("Security ID") - def quota_charged(self): + def quota_charged(self) -> int: """ This is an explicit method because it may not exist in OSes under Win2k @@ -774,7 +779,7 @@ def quota_charged(self): except OverrunBufferException: raise StandardInformationFieldDoesNotExist("Quota Charged") - def usn(self): + def usn(self) -> int: """ This is an explicit method because it may not exist in OSes under Win2k @@ -812,11 +817,13 @@ def __init__( self.accessed_time: typing.Callable[[], datetime] self.declare_field("qword", "physical_size") + self.physical_size: typing.Callable[[], int] self.declare_field("qword", "logical_size") self.logical_size: typing.Callable[[], int] self.declare_field("dword", "flags") + self.flags: typing.Callable[[], int] self.declare_field("dword", "reparse_value") @@ -902,10 +909,12 @@ def __init__( self.declare_field( "binary", "length_binary", self.current_field_offset(), self._length_length ) + self.length_binary: typing.Callable[[], array.array] self.declare_field( "binary", "offset_binary", self.current_field_offset(), self._offset_length ) + self.offset_binary: typing.Callable[[], array.array] @staticmethod def structure_size( @@ -948,11 +957,11 @@ def lsb2signednum(self, binary: array.array): ret *= -1 return ret - def offset(self): + def offset(self) -> int: # TODO(wb): make this run_offset return self.lsb2signednum(self.offset_binary()) - def length(self): + def length(self) -> int: # TODO(wb): make this run_offset return self.lsb2num(self.length_binary()) @@ -985,7 +994,7 @@ def structure_size( def __len__(self): return sum(map(len, self._entries())) - def _entries(self, length=None): + def _entries(self, length: typing.Optional[int] = None) -> typing.List[Runentry]: ret = [] offset = self.offset() entry = Runentry(self._buf, offset, self) @@ -999,7 +1008,7 @@ def _entries(self, length=None): entry = Runentry(self._buf, offset, self) return ret - def runs(self, length=None): + def runs(self, length=None) -> typing.Iterator[typing.Tuple[int, int]]: """ Yields tuples (volume offset, length). Recall that the entries are relative to one another @@ -1085,6 +1094,7 @@ def __init__( self.name_offset: typing.Callable[[], int] self.declare_field("word", "flags") + self.flags: typing.Callable[[], int] self.declare_field("word", "instance") @@ -1094,6 +1104,7 @@ def __init__( self.declare_field("qword", "highest_vcn") self.declare_field("word", "runlist_offset") + self.runlist_offset: typing.Callable[[], int] self.declare_field("byte", "compression_unit") @@ -1108,6 +1119,7 @@ def __init__( self.declare_field("byte", "reserved5") self.declare_field("qword", "allocated_size") + self.allocated_size: typing.Callable[[], int] self.declare_field("qword", "data_size") self.data_size: typing.Callable[[], int] @@ -1144,7 +1156,7 @@ def structure_size( def __len__(self): return self.size() - def runlist(self): + def runlist(self) -> Runlist: return Runlist(self._buf, self.offset() + self.runlist_offset(), self) def size(self): @@ -1160,14 +1172,14 @@ class MFT_RECORD_FLAGS: MFT_RECORD_IS_DIRECTORY = 0x2 -def MREF(mft_reference): +def MREF(mft_reference) -> int: """ Given a MREF/mft_reference, return the record number part. """ return mft_reference & 0xFFFFFFFFFFFF -def MSEQNO(mft_reference): +def MSEQNO(mft_reference) -> int: """ Given a MREF/mft_reference, return the sequence number part. """ @@ -1242,10 +1254,11 @@ def attributes(self) -> typing.Iterator[Attribute]: offset += len(a) yield a - def attribute(self, attr_type): + def attribute(self, attr_type) -> typing.Optional[Attribute]: for a in self.attributes(): if a.type() == attr_type: return a + return None def is_directory(self) -> bool: return bool(self.flags() & MFT_RECORD_FLAGS.MFT_RECORD_IS_DIRECTORY) @@ -1283,6 +1296,8 @@ def filename_information(self) -> typing.Optional[FilenameAttribute]: def standard_information(self) -> typing.Optional[StandardInformation]: try: attr = self.attribute(ATTR_TYPE.STANDARD_INFORMATION) + if attr is None: + return None return StandardInformation(attr.value(), 0, self) except AttributeError: return None @@ -1463,7 +1478,7 @@ def mft_record_build_path( cycledetector[rec_num] = True return self.mft_record_build_path(parent, cycledetector) + "\\" + fn.filename() - def mft_get_record_by_path(self, path): + def mft_get_record_by_path(self, path) -> typing.Optional[MFTRecord]: # TODO could optimize here by trying to use INDX buffers # and actually walk through the FS count = -1 @@ -1477,9 +1492,9 @@ def mft_get_record_by_path(self, path): if record_path.lower() != path.lower(): continue return record - return False + return None - def read(self, offset, length): + def read(self, offset, length) -> array.array: if self.filetype == "image": with open(self.filename, "rb") as f: f.seek(offset) diff --git a/indxparse/MFTINDX.py b/indxparse/MFTINDX.py index 0304883..1cdb140 100755 --- a/indxparse/MFTINDX.py +++ b/indxparse/MFTINDX.py @@ -30,7 +30,7 @@ import re import sys from datetime import datetime -from typing import Optional +from typing import List, Optional from indxparse.BinaryParser import OverrunBufferException from indxparse.MFT import ( @@ -245,7 +245,7 @@ def record_indx_entries_bodyfile(options, ntfsfile, record): return ret -def try_write(s): +def try_write(s: str) -> None: try: sys.stdout.write(s) except (UnicodeEncodeError, UnicodeDecodeError): @@ -385,7 +385,7 @@ def print_indx_info(options): if rfni is not None: print(" size: %d bytes" % (rfni.logical_size())) - def get_flags(flags): + def get_flags(flags) -> List[str]: attributes = [] if flags & 0x01: attributes.append("readonly") From eb1a3cd828b2b0c1ffc82aec45405f229ae27463 Mon Sep 17 00:00:00 2001 From: Alex Nelson Date: Mon, 14 Aug 2023 20:14:38 -0400 Subject: [PATCH 10/12] Add type annotations for MFTINDX.py With this patch, `MFTINDX.py` passes `mypy --strict`. A follow-on patch will add this status to the testing. Disclaimer: Participation by NIST in the creation of the documentation of mentioned software is not intended to imply a recommendation or endorsement by the National Institute of Standards and Technology, nor is it intended to imply that any specific software is necessarily the best available for the purpose. Signed-off-by: Alex Nelson --- indxparse/MFTINDX.py | 43 +++++++++++++++++++++++++++++++++---------- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/indxparse/MFTINDX.py b/indxparse/MFTINDX.py index 1cdb140..4865b41 100755 --- a/indxparse/MFTINDX.py +++ b/indxparse/MFTINDX.py @@ -30,7 +30,7 @@ import re import sys from datetime import datetime -from typing import List, Optional +from typing import Any, List, Optional, Union from indxparse.BinaryParser import OverrunBufferException from indxparse.MFT import ( @@ -52,7 +52,14 @@ verbose = False -def information_bodyfile(path, size, inode, owner_id, info, attributes=None): +def information_bodyfile( + path: str, + size: int, + inode: int, + owner_id: int, + info: Union[FilenameAttribute, StandardInformation], + attributes: Optional[List[str]] = None, +) -> str: if not attributes: attributes = [] try: @@ -86,7 +93,9 @@ def information_bodyfile(path, size, inode, owner_id, info, attributes=None): ) -def record_bodyfile(ntfsfile, record, inode=None, attributes=None): +def record_bodyfile( + ntfsfile: NTFSFile, record: MFTRecord, attributes: Optional[List[str]] = None +) -> str: """ Return a bodyfile formatted string for the given MFT record. The string contains metadata for the one file described by the record. @@ -162,7 +171,11 @@ def record_bodyfile(ntfsfile, record, inode=None, attributes=None): return ret -def node_header_bodyfile(options, node_header, basepath): +def node_header_bodyfile( + options: argparse.Namespace, + node_header: NTATTR_STANDARD_INDEX_HEADER, + basepath: str, +) -> str: """ Returns a bodyfile formatted string for all INDX entries following the given INDX node header. @@ -189,7 +202,11 @@ def node_header_bodyfile(options, node_header, basepath): return ret -def record_indx_entries_bodyfile(options, ntfsfile, record): +def record_indx_entries_bodyfile( + options: argparse.Namespace, + ntfsfile: NTFSFile, + record: MFTRecord, +) -> str: """ Returns a bodyfile formatted string for all INDX entries associated with the given MFT record @@ -254,7 +271,11 @@ def try_write(s: str) -> None: ) -def print_nonresident_indx_bodyfile(options, buf, basepath=""): +def print_nonresident_indx_bodyfile( + options: argparse.Namespace, + buf: array.array[Any], + basepath: str = "", +) -> None: offset = 0 try: irh = IndexRecordHeader(buf, offset, False) @@ -274,7 +295,9 @@ def print_nonresident_indx_bodyfile(options, buf, basepath=""): return -def print_bodyfile(options): +def print_bodyfile( + options: argparse.Namespace, +) -> None: if options.filetype == "mft" or options.filetype == "image": ntfs_file = NTFSFile( clustersize=options.clustersize, @@ -334,7 +357,7 @@ def print_bodyfile(options): print_nonresident_indx_bodyfile(options, buf) -def print_indx_info(options): +def print_indx_info(options: argparse.Namespace) -> None: f = NTFSFile( clustersize=options.clustersize, filename=options.filename, @@ -385,7 +408,7 @@ def print_indx_info(options): if rfni is not None: print(" size: %d bytes" % (rfni.logical_size())) - def get_flags(flags) -> List[str]: + def get_flags(flags: int) -> List[str]: attributes = [] if flags & 0x01: attributes.append("readonly") @@ -598,7 +621,7 @@ def get_flags(flags) -> List[str]: return -def main(): +def main() -> None: parser = argparse.ArgumentParser(description="Parse NTFS " "filesystem structures.") parser.add_argument( "-t", From 70898bd0eb1cc24dcb80e613811d6ad274fde21f Mon Sep 17 00:00:00 2001 From: Alex Nelson Date: Mon, 14 Aug 2023 20:19:56 -0400 Subject: [PATCH 11/12] Have CI review MFTINDX.py with mypy --strict This is in partial satisfaction of adding mypy type checking, noted on Issue 38. Disclaimer: Participation by NIST in the creation of the documentation of mentioned software is not intended to imply a recommendation or endorsement by the National Institute of Standards and Technology, nor is it intended to imply that any specific software is necessarily the best available for the purpose. References: * https://github.com/williballenthin/INDXParse/issues/38 Signed-off-by: Alex Nelson --- Makefile | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Makefile b/Makefile index ca297c7..ceb3659 100644 --- a/Makefile +++ b/Makefile @@ -50,6 +50,10 @@ check-mypy: \ source venv/bin/activate \ && mypy \ indxparse + source venv/bin/activate \ + && mypy \ + --strict \ + indxparse/MFTINDX.py check-third_party: $(MAKE) \ From 21c24e268dbd085727b386808d624e39987ae8cd Mon Sep 17 00:00:00 2001 From: Alex Nelson Date: Mon, 14 Aug 2023 20:30:18 -0400 Subject: [PATCH 12/12] Remove reliance on options namespace in MFTINDX.py This is a continuation of the coding style trialed in Pull Request 62. References: * https://github.com/williballenthin/INDXParse/pull/62 Signed-off-by: Alex Nelson --- indxparse/MFTINDX.py | 205 +++++++++++++++++++++++++++++-------------- 1 file changed, 138 insertions(+), 67 deletions(-) diff --git a/indxparse/MFTINDX.py b/indxparse/MFTINDX.py index 4865b41..b7fa4ea 100755 --- a/indxparse/MFTINDX.py +++ b/indxparse/MFTINDX.py @@ -172,9 +172,12 @@ def record_bodyfile( def node_header_bodyfile( - options: argparse.Namespace, node_header: NTATTR_STANDARD_INDEX_HEADER, basepath: str, + *args: Any, + indxlist: bool, + slack: bool, + **kwargs: Any, ) -> str: """ Returns a bodyfile formatted string for all INDX entries following the @@ -182,7 +185,7 @@ def node_header_bodyfile( """ ret = "" attrs = ["filename", "INDX"] - if options.indxlist: + if indxlist: for e in node_header.entries(): path = basepath + "\\" + e.filename_information().filename() size = e.filename_information().logical_size() @@ -191,7 +194,7 @@ def node_header_bodyfile( path, size, inode, 0, e.filename_information(), attributes=attrs ) attrs.append("slack") - if options.slack: + if slack: for e in node_header.slack_entries(): path = basepath + "\\" + e.filename_information().filename() size = e.filename_information().logical_size() @@ -203,9 +206,14 @@ def node_header_bodyfile( def record_indx_entries_bodyfile( - options: argparse.Namespace, ntfsfile: NTFSFile, record: MFTRecord, + *args: Any, + clustersize: int, + indxlist: bool, + offset: int, + slack: bool, + **kwargs: Any, ) -> str: """ Returns a bodyfile formatted string for all INDX entries associated with @@ -225,7 +233,7 @@ def record_indx_entries_bodyfile( else: iroh = IndexRootHeader(indxroot.value(), 0, False) nh = iroh.node_header() - ret += node_header_bodyfile(options, nh, basepath) + ret += node_header_bodyfile(nh, basepath, indxlist=indxlist, slack=slack) extractbuf = array.array("B") for attr in record.attributes(): if attr.type() != ATTR_TYPE.INDEX_ALLOCATION: @@ -233,8 +241,8 @@ def record_indx_entries_bodyfile( if attr.non_resident() != 0: for offset, length in attr.runlist().runs(): try: - ooff = offset * options.clustersize + options.offset - llen = length * options.clustersize + ooff = offset * clustersize + offset + llen = length * clustersize extractbuf += f.read(ooff, llen) except IOError: pass @@ -250,9 +258,9 @@ def record_indx_entries_bodyfile( # TODO could miss something if there is an empty, valid record at the end while ireh.magic() == 0x58444E49: nh = ireh.node_header() - ret += node_header_bodyfile(options, nh, basepath) + ret += node_header_bodyfile(nh, basepath, indxlist=indxlist, slack=slack) # TODO get this from the boot record - offset += options.clustersize + offset += clustersize if offset + 4096 > len(extractbuf): # TODO make this INDX record size return ret try: @@ -272,9 +280,13 @@ def try_write(s: str) -> None: def print_nonresident_indx_bodyfile( - options: argparse.Namespace, buf: array.array[Any], basepath: str = "", + *args: Any, + clustersize: int, + indxlist: bool, + slack: bool, + **kwargs: Any, ) -> None: offset = 0 try: @@ -284,8 +296,8 @@ def print_nonresident_indx_bodyfile( # TODO could miss something if there is an empty, valid record at the end while irh.magic() == 0x58444E49: nh = irh.node_header() - try_write(node_header_bodyfile(options, nh, basepath)) - offset += options.clustersize + try_write(node_header_bodyfile(nh, basepath, indxlist=indxlist, slack=slack)) + offset += clustersize if offset + 4096 > len(buf): # TODO make this INDX record size return try: @@ -296,41 +308,62 @@ def print_nonresident_indx_bodyfile( def print_bodyfile( - options: argparse.Namespace, + *args: Any, + clustersize: int, + deleted: bool, + filename: str, + filetype: str, + filter_pattern: str, + indxlist: bool, + mftlist: bool, + offset: int, + prefix: str, + progress: bool, + slack: bool, + **kwargs: Any, ) -> None: - if options.filetype == "mft" or options.filetype == "image": + if filetype == "mft" or filetype == "image": ntfs_file = NTFSFile( - clustersize=options.clustersize, - filename=options.filename, - filetype=options.filetype, - offset=options.offset, - prefix=options.prefix, - progress=options.progress, + clustersize=clustersize, + filename=filename, + filetype=filetype, + offset=offset, + prefix=prefix, + progress=progress, ) - if options.filter: - refilter = re.compile(options.filter) + if filter_pattern: + refilter = re.compile(filter_pattern) for record in ntfs_file.record_generator(): logging.debug("Considering MFT record %s" % (record.mft_record_number())) try: if record.magic() != 0x454C4946: logging.debug("Record has a bad magic value") continue - if options.filter: + if filter_pattern: path = ntfs_file.mft_record_build_path(record, {}) if not refilter.search(path): logging.debug( "Skipping listing path " "due to regex filter: " + path ) continue - if record.is_active() and options.mftlist: + if record.is_active() and mftlist: try_write(record_bodyfile(ntfs_file, record)) - if options.indxlist or options.slack: - try_write(record_indx_entries_bodyfile(options, ntfs_file, record)) - elif (not record.is_active()) and options.deleted: + if indxlist or slack: + try_write( + record_indx_entries_bodyfile( + ntfs_file, + record, + clustersize=clustersize, + indxlist=indxlist, + offset=offset, + slack=slack, + ) + ) + elif (not record.is_active()) and deleted: try_write( record_bodyfile(ntfs_file, record, attributes=["deleted"]) ) - if options.filetype == "image" and (options.indxlist or options.slack): + if filetype == "image" and (indxlist or slack): extractbuf = array.array("B") found_indxalloc = False for attr in record.attributes(): @@ -339,44 +372,61 @@ def print_bodyfile( found_indxalloc = True if attr.non_resident() != 0: for offset, length in attr.runlist().runs(): - ooff = offset * options.clustersize + options.offset - llen = length * options.clustersize + ooff = offset * clustersize + offset + llen = length * clustersize extractbuf += ntfs_file.read(ooff, llen) else: pass # This shouldn't happen. if found_indxalloc and len(extractbuf) > 0: path = ntfs_file.mft_record_build_path(record, {}) print_nonresident_indx_bodyfile( - options, extractbuf, basepath=path + extractbuf, + basepath=path, + clustersize=clustersize, + indxlist=indxlist, + slack=slack, ) except InvalidAttributeException: pass - elif options.filetype == "indx": - with open(options.filename, "rb") as fh: + elif filetype == "indx": + with open(filename, "rb") as fh: buf = array.array("B", fh.read()) - print_nonresident_indx_bodyfile(options, buf) + print_nonresident_indx_bodyfile( + buf, clustersize=clustersize, indxlist=indxlist, slack=slack + ) -def print_indx_info(options: argparse.Namespace) -> None: +def print_indx_info( + *args: Any, + clustersize: int, + extract: str, + filename: str, + filetype: str, + infomode: str, + offset: int, + prefix: str, + progress: bool, + **kwargs: Any, +) -> None: f = NTFSFile( - clustersize=options.clustersize, - filename=options.filename, - filetype=options.filetype, - offset=options.offset, - prefix=options.prefix, - progress=options.progress, + clustersize=clustersize, + filename=filename, + filetype=filetype, + offset=offset, + prefix=prefix, + progress=progress, ) record: Optional[MFTRecord] = None try: - record_num = int(options.infomode) + record_num = int(infomode) record_buf = f.mft_get_record_buf(record_num) record = MFTRecord(record_buf, 0, False) except ValueError: - record = f.mft_get_record_by_path(options.infomode) + record = f.mft_get_record_by_path(infomode) if record is None: - print("Did not find directory entry for " + options.infomode) + print("Did not find directory entry for " + infomode) return - print("Found directory entry for: " + options.infomode) + print("Found directory entry for: " + infomode) if record.magic() != 0x454C4946: if record.magic() == int("0xBAAD", 0x10): @@ -516,10 +566,10 @@ def get_flags(flags: int) -> List[str]: print( " %s (%s) bytes for %s (%s) bytes" % ( - offset * options.clustersize, - hex(offset * options.clustersize), - length * options.clustersize, - hex(length * options.clustersize), + offset * clustersize, + hex(offset * clustersize), + length * clustersize, + hex(length * clustersize), ) ) else: @@ -596,18 +646,18 @@ def get_flags(flags: int) -> List[str]: print( " Using clustersize %s (%s) bytes and volume offset %s (%s) bytes: \n %s (%s) bytes for %s (%s) bytes" % ( - options.clustersize, - hex(options.clustersize), - options.offset, - hex(options.offset), - (offset * options.clustersize) + options.offset, - hex((offset * options.clustersize) + options.offset), - length * options.clustersize, - hex(length * options.clustersize), + clustersize, + hex(clustersize), + offset, + hex(offset), + (offset * clustersize) + offset, + hex((offset * clustersize) + offset), + length * clustersize, + hex(length * clustersize), ) ) - ooff = offset * options.clustersize + options.offset - llen = length * options.clustersize + ooff = offset * clustersize + offset + llen = length * clustersize extractbuf += f.read(ooff, llen) else: # This shouldn't happen. @@ -615,8 +665,8 @@ def get_flags(flags: int) -> List[str]: if not found_indxalloc: print("No INDX_ALLOCATION attribute found") return - if options.extract: - with open(options.extract, "wb") as g: + if extract: + with open(extract, "wb") as g: g.write(extractbuf) return @@ -696,7 +746,7 @@ def main() -> None: action="store", metavar="regex", nargs=1, - dest="filter", + dest="filter_pattern", help="Only consider entries whose path " "matches this regular expression", ) parser.add_argument( @@ -837,19 +887,40 @@ def main() -> None: ): logging.error("You must choose a mode (-i/-l/-s/-m/-d)") - if results.filter: - results.filter = results.filter[0] + if results.filter_pattern: + results.filter_pattern = results.filter_pattern[0] logging.info( "Asked to only list file entry information " - "for paths matching the regular expression: " + results.filter + "for paths matching the regular expression: " + results.filter_pattern ) if results.infomode: logging.warning("This filter has no meaning with information mode (-i)") if results.infomode: - print_indx_info(results) + print_indx_info( + clustersize=results.clustersize, + extract=results.extract, + filename=results.filename, + filetype=results.filetype, + infomode=results.infomode, + offset=results.offset, + prefix=results.prefix, + progress=results.progress, + ) elif results.indxlist or results.slack or results.mftlist or results.deleted: - print_bodyfile(results) + print_bodyfile( + clustersize=results.clustersize, + deleted=results.deleted, + filename=results.filename, + filetype=results.filetype, + filter_pattern=results.filter_pattern, + indxlist=results.indxlist, + mftlist=results.mftlist, + offset=results.offset, + prefix=results.prefix, + progress=results.progress, + slack=results.slack, + ) if __name__ == "__main__":