Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Review MFTINDX.py with mypy --strict #67

4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ check-mypy: \
source venv/bin/activate \
&& mypy \
indxparse
source venv/bin/activate \
&& mypy \
--strict \
indxparse/MFTINDX.py

check-third_party:
$(MAKE) \
Expand Down
68 changes: 46 additions & 22 deletions indxparse/MFT.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ def __init__(
self.entry_list_start: typing.Callable[[], int]

self.declare_field("dword", "entry_list_end")
self.entry_list_end: typing.Callable[[], int]

self.declare_field("dword", "entry_list_allocation_end")
self.entry_list_allocation_end: typing.Callable[[], int]
Expand All @@ -514,7 +515,7 @@ def __init__(
self.entry_list_allocation_end() - self.entry_list_start(),
)

def entries(self):
def entries(self) -> typing.Iterator[IndexEntry]:
"""
A generator that returns each INDX entry associated with this node.
"""
Expand All @@ -528,7 +529,7 @@ def entries(self):
offset += e.length()
yield e

def slack_entries(self):
def slack_entries(self) -> typing.Iterator[SlackIndexEntry]:
"""
A generator that yields INDX entries found in the slack space
associated with this header.
Expand Down Expand Up @@ -569,7 +570,7 @@ def __init__(
self.declare_field("byte", "unused3")
self._node_header_offset = self.current_field_offset()

def node_header(self):
def node_header(self) -> NTATTR_STANDARD_INDEX_HEADER:
return NTATTR_STANDARD_INDEX_HEADER(
self._buf, self.offset() + self._node_header_offset, self
)
Expand All @@ -586,6 +587,7 @@ def __init__(
super(IndexRecordHeader, self).__init__(buf, offset, parent)

self.declare_field("dword", "magic", 0x0)
self.magic: typing.Callable[[], int]

self.declare_field("word", "usa_offset")
self.usa_offset: typing.Callable[[], int]
Expand All @@ -600,7 +602,7 @@ def __init__(
self._node_header_offset = self.current_field_offset()
self.fixup(self.usa_count(), self.usa_offset())

def node_header(self):
def node_header(self) -> NTATTR_STANDARD_INDEX_HEADER:
return NTATTR_STANDARD_INDEX_HEADER(
self._buf, self.offset() + self._node_header_offset, self
)
Expand Down Expand Up @@ -669,6 +671,7 @@ def __init__(
self.declare_field("qword", "mft_reference", 0x0)

self.declare_field("word", "length")
self.length: typing.Callable[[], int]

self.declare_field("word", "filename_information_length")
self.filename_information_length: typing.Callable[[], int]
Expand All @@ -681,12 +684,13 @@ def __init__(
self.current_field_offset(),
self.filename_information_length(),
)
self._off_filename_information_buffer: int

self.declare_field(
"qword", "child_vcn", align(self.current_field_offset(), 0x8)
)

def filename_information(self):
def filename_information(self) -> FilenameAttribute:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the type hints are nice additions. thanks!

return FilenameAttribute(
self._buf, self.offset() + self._off_filename_information_buffer, self
)
Expand Down Expand Up @@ -724,6 +728,7 @@ def __init__(
self.accessed_time: typing.Callable[[], datetime]

self.declare_field("dword", "attributes")
self.attributes: typing.Callable[[], int]

self.declare_field("binary", "reserved", self.current_field_offset(), 0xC)

Expand All @@ -741,7 +746,7 @@ def __init__(
# def __len__(self):
# return 0x42 + (self.filename_length() * 2)

def owner_id(self):
def owner_id(self) -> int:
"""
This is an explicit method because it may not exist in OSes under Win2k

Expand All @@ -763,7 +768,7 @@ def security_id(self) -> int:
except OverrunBufferException:
raise StandardInformationFieldDoesNotExist("Security ID")

def quota_charged(self):
def quota_charged(self) -> int:
"""
This is an explicit method because it may not exist in OSes under Win2k

Expand All @@ -774,7 +779,7 @@ def quota_charged(self):
except OverrunBufferException:
raise StandardInformationFieldDoesNotExist("Quota Charged")

def usn(self):
def usn(self) -> int:
"""
This is an explicit method because it may not exist in OSes under Win2k

Expand All @@ -797,6 +802,7 @@ def __init__(
super(FilenameAttribute, self).__init__(buf, offset)

self.declare_field("qword", "mft_parent_reference", 0x0)
self.mft_parent_reference: typing.Callable[[], int]

self.declare_field("filetime", "created_time")
self.created_time: typing.Callable[[], datetime]
Expand All @@ -811,11 +817,13 @@ def __init__(
self.accessed_time: typing.Callable[[], datetime]

self.declare_field("qword", "physical_size")
self.physical_size: typing.Callable[[], int]

self.declare_field("qword", "logical_size")
self.logical_size: typing.Callable[[], int]

self.declare_field("dword", "flags")
self.flags: typing.Callable[[], int]

self.declare_field("dword", "reparse_value")

Expand Down Expand Up @@ -901,10 +909,12 @@ def __init__(
self.declare_field(
"binary", "length_binary", self.current_field_offset(), self._length_length
)
self.length_binary: typing.Callable[[], array.array]

self.declare_field(
"binary", "offset_binary", self.current_field_offset(), self._offset_length
)
self.offset_binary: typing.Callable[[], array.array]

@staticmethod
def structure_size(
Expand Down Expand Up @@ -947,11 +957,11 @@ def lsb2signednum(self, binary: array.array):
ret *= -1
return ret

def offset(self):
def offset(self) -> int:
# TODO(wb): make this run_offset
return self.lsb2signednum(self.offset_binary())

def length(self):
def length(self) -> int:
# TODO(wb): make this run_offset
return self.lsb2num(self.length_binary())

Expand Down Expand Up @@ -984,7 +994,7 @@ def structure_size(
def __len__(self):
return sum(map(len, self._entries()))

def _entries(self, length=None):
def _entries(self, length: typing.Optional[int] = None) -> typing.List[Runentry]:
ret = []
offset = self.offset()
entry = Runentry(self._buf, offset, self)
Expand All @@ -998,7 +1008,7 @@ def _entries(self, length=None):
entry = Runentry(self._buf, offset, self)
return ret

def runs(self, length=None):
def runs(self, length=None) -> typing.Iterator[typing.Tuple[int, int]]:
"""
Yields tuples (volume offset, length).
Recall that the entries are relative to one another
Expand Down Expand Up @@ -1084,6 +1094,7 @@ def __init__(
self.name_offset: typing.Callable[[], int]

self.declare_field("word", "flags")
self.flags: typing.Callable[[], int]

self.declare_field("word", "instance")

Expand All @@ -1093,6 +1104,7 @@ def __init__(
self.declare_field("qword", "highest_vcn")

self.declare_field("word", "runlist_offset")
self.runlist_offset: typing.Callable[[], int]

self.declare_field("byte", "compression_unit")

Expand All @@ -1107,6 +1119,7 @@ def __init__(
self.declare_field("byte", "reserved5")

self.declare_field("qword", "allocated_size")
self.allocated_size: typing.Callable[[], int]

self.declare_field("qword", "data_size")
self.data_size: typing.Callable[[], int]
Expand Down Expand Up @@ -1143,7 +1156,7 @@ def structure_size(
def __len__(self):
return self.size()

def runlist(self):
def runlist(self) -> Runlist:
return Runlist(self._buf, self.offset() + self.runlist_offset(), self)

def size(self):
Expand All @@ -1159,14 +1172,14 @@ class MFT_RECORD_FLAGS:
MFT_RECORD_IS_DIRECTORY = 0x2


def MREF(mft_reference):
def MREF(mft_reference) -> int:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def MREF(mft_reference) -> int:
def MREF(mft_reference: int) -> int:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this suggestion, by the way. I recall that mypy --strict used to require that arguments in the information flow path would also be required to be annotated, but it looks like the review strictness now only reaches return types when checking imported modules. I purposefully left this here to see if it was a quirk of my environment, but, nope.

"""
Given a MREF/mft_reference, return the record number part.
"""
return mft_reference & 0xFFFFFFFFFFFF


def MSEQNO(mft_reference):
def MSEQNO(mft_reference) -> int:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def MSEQNO(mft_reference) -> int:
def MSEQNO(mft_reference: int) -> int:

"""
Given a MREF/mft_reference, return the sequence number part.
"""
Expand Down Expand Up @@ -1200,6 +1213,7 @@ def __init__(
self.declare_field("qword", "lsn")

self.declare_field("word", "sequence_number")
self.sequence_number: typing.Callable[[], int]

self.declare_field("word", "link_count")

Expand Down Expand Up @@ -1240,10 +1254,11 @@ def attributes(self) -> typing.Iterator[Attribute]:
offset += len(a)
yield a

def attribute(self, attr_type):
def attribute(self, attr_type) -> typing.Optional[Attribute]:
for a in self.attributes():
if a.type() == attr_type:
return a
return None

def is_directory(self) -> bool:
return bool(self.flags() & MFT_RECORD_FLAGS.MFT_RECORD_IS_DIRECTORY)
Expand Down Expand Up @@ -1281,6 +1296,8 @@ def filename_information(self) -> typing.Optional[FilenameAttribute]:
def standard_information(self) -> typing.Optional[StandardInformation]:
try:
attr = self.attribute(ATTR_TYPE.STANDARD_INFORMATION)
if attr is None:
return None
return StandardInformation(attr.value(), 0, self)
except AttributeError:
return None
Expand Down Expand Up @@ -1395,7 +1412,7 @@ def record_generator(self, start_at=0) -> typing.Iterator[MFTRecord]:
logging.debug("Yielding record %d", count)
yield record

def mft_get_record_buf(self, number):
def mft_get_record_buf(self, number: int) -> array.array:
if self.filetype == "indx":
return array.array("B", "")
if self.filetype == "mft":
Expand All @@ -1410,8 +1427,11 @@ def mft_get_record_buf(self, number):
f.seek(self.mftoffset)
f.seek(number * 1024, 1)
return array.array("B", f.read(1024))
raise ValueError(
"Retrieval method not defined for self.filetype = %s." % self.filetype
)

def mft_get_record(self, number):
def mft_get_record(self, number: int) -> MFTRecord:
buf = self.mft_get_record_buf(number)
if buf == array.array("B", ""):
raise InvalidMFTRecordNumber(number)
Expand All @@ -1426,7 +1446,11 @@ def mft_get_record(self, number):
+ str(r.mft_record_number())
+ str(r.flags()),
)
def mft_record_build_path(self, record, cycledetector=None):
def mft_record_build_path(
self,
record: MFTRecord,
cycledetector: typing.Optional[typing.Dict[int, bool]] = None,
) -> str:
if cycledetector is None:
cycledetector = {}
rec_num = record.mft_record_number() & 0xFFFFFFFFFFFF
Expand Down Expand Up @@ -1454,7 +1478,7 @@ def mft_record_build_path(self, record, cycledetector=None):
cycledetector[rec_num] = True
return self.mft_record_build_path(parent, cycledetector) + "\\" + fn.filename()

def mft_get_record_by_path(self, path):
def mft_get_record_by_path(self, path) -> typing.Optional[MFTRecord]:
# TODO could optimize here by trying to use INDX buffers
# and actually walk through the FS
count = -1
Expand All @@ -1468,9 +1492,9 @@ def mft_get_record_by_path(self, path):
if record_path.lower() != path.lower():
continue
return record
return False
return None

def read(self, offset, length):
def read(self, offset, length) -> array.array:
if self.filetype == "image":
with open(self.filename, "rb") as f:
f.seek(offset)
Expand Down
Loading