-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Review MFTINDX.py
with mypy --strict
#67
Changes from all commits
b5b05a8
f45b391
0aee6b4
7a57119
134e9e0
2082e1f
7cdf1f4
e494b16
7372c50
eb1a3cd
70898bd
21c24e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -501,6 +501,7 @@ def __init__( | |||||
self.entry_list_start: typing.Callable[[], int] | ||||||
|
||||||
self.declare_field("dword", "entry_list_end") | ||||||
self.entry_list_end: typing.Callable[[], int] | ||||||
|
||||||
self.declare_field("dword", "entry_list_allocation_end") | ||||||
self.entry_list_allocation_end: typing.Callable[[], int] | ||||||
|
@@ -514,7 +515,7 @@ def __init__( | |||||
self.entry_list_allocation_end() - self.entry_list_start(), | ||||||
) | ||||||
|
||||||
def entries(self): | ||||||
def entries(self) -> typing.Iterator[IndexEntry]: | ||||||
""" | ||||||
A generator that returns each INDX entry associated with this node. | ||||||
""" | ||||||
|
@@ -528,7 +529,7 @@ def entries(self): | |||||
offset += e.length() | ||||||
yield e | ||||||
|
||||||
def slack_entries(self): | ||||||
def slack_entries(self) -> typing.Iterator[SlackIndexEntry]: | ||||||
""" | ||||||
A generator that yields INDX entries found in the slack space | ||||||
associated with this header. | ||||||
|
@@ -569,7 +570,7 @@ def __init__( | |||||
self.declare_field("byte", "unused3") | ||||||
self._node_header_offset = self.current_field_offset() | ||||||
|
||||||
def node_header(self): | ||||||
def node_header(self) -> NTATTR_STANDARD_INDEX_HEADER: | ||||||
return NTATTR_STANDARD_INDEX_HEADER( | ||||||
self._buf, self.offset() + self._node_header_offset, self | ||||||
) | ||||||
|
@@ -586,6 +587,7 @@ def __init__( | |||||
super(IndexRecordHeader, self).__init__(buf, offset, parent) | ||||||
|
||||||
self.declare_field("dword", "magic", 0x0) | ||||||
self.magic: typing.Callable[[], int] | ||||||
|
||||||
self.declare_field("word", "usa_offset") | ||||||
self.usa_offset: typing.Callable[[], int] | ||||||
|
@@ -600,7 +602,7 @@ def __init__( | |||||
self._node_header_offset = self.current_field_offset() | ||||||
self.fixup(self.usa_count(), self.usa_offset()) | ||||||
|
||||||
def node_header(self): | ||||||
def node_header(self) -> NTATTR_STANDARD_INDEX_HEADER: | ||||||
return NTATTR_STANDARD_INDEX_HEADER( | ||||||
self._buf, self.offset() + self._node_header_offset, self | ||||||
) | ||||||
|
@@ -669,6 +671,7 @@ def __init__( | |||||
self.declare_field("qword", "mft_reference", 0x0) | ||||||
|
||||||
self.declare_field("word", "length") | ||||||
self.length: typing.Callable[[], int] | ||||||
|
||||||
self.declare_field("word", "filename_information_length") | ||||||
self.filename_information_length: typing.Callable[[], int] | ||||||
|
@@ -681,12 +684,13 @@ def __init__( | |||||
self.current_field_offset(), | ||||||
self.filename_information_length(), | ||||||
) | ||||||
self._off_filename_information_buffer: int | ||||||
|
||||||
self.declare_field( | ||||||
"qword", "child_vcn", align(self.current_field_offset(), 0x8) | ||||||
) | ||||||
|
||||||
def filename_information(self): | ||||||
def filename_information(self) -> FilenameAttribute: | ||||||
return FilenameAttribute( | ||||||
self._buf, self.offset() + self._off_filename_information_buffer, self | ||||||
) | ||||||
|
@@ -724,6 +728,7 @@ def __init__( | |||||
self.accessed_time: typing.Callable[[], datetime] | ||||||
|
||||||
self.declare_field("dword", "attributes") | ||||||
self.attributes: typing.Callable[[], int] | ||||||
|
||||||
self.declare_field("binary", "reserved", self.current_field_offset(), 0xC) | ||||||
|
||||||
|
@@ -741,7 +746,7 @@ def __init__( | |||||
# def __len__(self): | ||||||
# return 0x42 + (self.filename_length() * 2) | ||||||
|
||||||
def owner_id(self): | ||||||
def owner_id(self) -> int: | ||||||
""" | ||||||
This is an explicit method because it may not exist in OSes under Win2k | ||||||
|
||||||
|
@@ -763,7 +768,7 @@ def security_id(self) -> int: | |||||
except OverrunBufferException: | ||||||
raise StandardInformationFieldDoesNotExist("Security ID") | ||||||
|
||||||
def quota_charged(self): | ||||||
def quota_charged(self) -> int: | ||||||
""" | ||||||
This is an explicit method because it may not exist in OSes under Win2k | ||||||
|
||||||
|
@@ -774,7 +779,7 @@ def quota_charged(self): | |||||
except OverrunBufferException: | ||||||
raise StandardInformationFieldDoesNotExist("Quota Charged") | ||||||
|
||||||
def usn(self): | ||||||
def usn(self) -> int: | ||||||
""" | ||||||
This is an explicit method because it may not exist in OSes under Win2k | ||||||
|
||||||
|
@@ -797,6 +802,7 @@ def __init__( | |||||
super(FilenameAttribute, self).__init__(buf, offset) | ||||||
|
||||||
self.declare_field("qword", "mft_parent_reference", 0x0) | ||||||
self.mft_parent_reference: typing.Callable[[], int] | ||||||
|
||||||
self.declare_field("filetime", "created_time") | ||||||
self.created_time: typing.Callable[[], datetime] | ||||||
|
@@ -811,11 +817,13 @@ def __init__( | |||||
self.accessed_time: typing.Callable[[], datetime] | ||||||
|
||||||
self.declare_field("qword", "physical_size") | ||||||
self.physical_size: typing.Callable[[], int] | ||||||
|
||||||
self.declare_field("qword", "logical_size") | ||||||
self.logical_size: typing.Callable[[], int] | ||||||
|
||||||
self.declare_field("dword", "flags") | ||||||
self.flags: typing.Callable[[], int] | ||||||
|
||||||
self.declare_field("dword", "reparse_value") | ||||||
|
||||||
|
@@ -901,10 +909,12 @@ def __init__( | |||||
self.declare_field( | ||||||
"binary", "length_binary", self.current_field_offset(), self._length_length | ||||||
) | ||||||
self.length_binary: typing.Callable[[], array.array] | ||||||
|
||||||
self.declare_field( | ||||||
"binary", "offset_binary", self.current_field_offset(), self._offset_length | ||||||
) | ||||||
self.offset_binary: typing.Callable[[], array.array] | ||||||
|
||||||
@staticmethod | ||||||
def structure_size( | ||||||
|
@@ -947,11 +957,11 @@ def lsb2signednum(self, binary: array.array): | |||||
ret *= -1 | ||||||
return ret | ||||||
|
||||||
def offset(self): | ||||||
def offset(self) -> int: | ||||||
# TODO(wb): make this run_offset | ||||||
return self.lsb2signednum(self.offset_binary()) | ||||||
|
||||||
def length(self): | ||||||
def length(self) -> int: | ||||||
# TODO(wb): make this run_offset | ||||||
return self.lsb2num(self.length_binary()) | ||||||
|
||||||
|
@@ -984,7 +994,7 @@ def structure_size( | |||||
def __len__(self): | ||||||
return sum(map(len, self._entries())) | ||||||
|
||||||
def _entries(self, length=None): | ||||||
def _entries(self, length: typing.Optional[int] = None) -> typing.List[Runentry]: | ||||||
ret = [] | ||||||
offset = self.offset() | ||||||
entry = Runentry(self._buf, offset, self) | ||||||
|
@@ -998,7 +1008,7 @@ def _entries(self, length=None): | |||||
entry = Runentry(self._buf, offset, self) | ||||||
return ret | ||||||
|
||||||
def runs(self, length=None): | ||||||
def runs(self, length=None) -> typing.Iterator[typing.Tuple[int, int]]: | ||||||
""" | ||||||
Yields tuples (volume offset, length). | ||||||
Recall that the entries are relative to one another | ||||||
|
@@ -1084,6 +1094,7 @@ def __init__( | |||||
self.name_offset: typing.Callable[[], int] | ||||||
|
||||||
self.declare_field("word", "flags") | ||||||
self.flags: typing.Callable[[], int] | ||||||
|
||||||
self.declare_field("word", "instance") | ||||||
|
||||||
|
@@ -1093,6 +1104,7 @@ def __init__( | |||||
self.declare_field("qword", "highest_vcn") | ||||||
|
||||||
self.declare_field("word", "runlist_offset") | ||||||
self.runlist_offset: typing.Callable[[], int] | ||||||
|
||||||
self.declare_field("byte", "compression_unit") | ||||||
|
||||||
|
@@ -1107,6 +1119,7 @@ def __init__( | |||||
self.declare_field("byte", "reserved5") | ||||||
|
||||||
self.declare_field("qword", "allocated_size") | ||||||
self.allocated_size: typing.Callable[[], int] | ||||||
|
||||||
self.declare_field("qword", "data_size") | ||||||
self.data_size: typing.Callable[[], int] | ||||||
|
@@ -1143,7 +1156,7 @@ def structure_size( | |||||
def __len__(self): | ||||||
return self.size() | ||||||
|
||||||
def runlist(self): | ||||||
def runlist(self) -> Runlist: | ||||||
return Runlist(self._buf, self.offset() + self.runlist_offset(), self) | ||||||
|
||||||
def size(self): | ||||||
|
@@ -1159,14 +1172,14 @@ class MFT_RECORD_FLAGS: | |||||
MFT_RECORD_IS_DIRECTORY = 0x2 | ||||||
|
||||||
|
||||||
def MREF(mft_reference): | ||||||
def MREF(mft_reference) -> int: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with this suggestion, by the way. I recall that |
||||||
""" | ||||||
Given a MREF/mft_reference, return the record number part. | ||||||
""" | ||||||
return mft_reference & 0xFFFFFFFFFFFF | ||||||
|
||||||
|
||||||
def MSEQNO(mft_reference): | ||||||
def MSEQNO(mft_reference) -> int: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
""" | ||||||
Given a MREF/mft_reference, return the sequence number part. | ||||||
""" | ||||||
|
@@ -1200,6 +1213,7 @@ def __init__( | |||||
self.declare_field("qword", "lsn") | ||||||
|
||||||
self.declare_field("word", "sequence_number") | ||||||
self.sequence_number: typing.Callable[[], int] | ||||||
|
||||||
self.declare_field("word", "link_count") | ||||||
|
||||||
|
@@ -1240,10 +1254,11 @@ def attributes(self) -> typing.Iterator[Attribute]: | |||||
offset += len(a) | ||||||
yield a | ||||||
|
||||||
def attribute(self, attr_type): | ||||||
def attribute(self, attr_type) -> typing.Optional[Attribute]: | ||||||
for a in self.attributes(): | ||||||
if a.type() == attr_type: | ||||||
return a | ||||||
return None | ||||||
|
||||||
def is_directory(self) -> bool: | ||||||
return bool(self.flags() & MFT_RECORD_FLAGS.MFT_RECORD_IS_DIRECTORY) | ||||||
|
@@ -1281,6 +1296,8 @@ def filename_information(self) -> typing.Optional[FilenameAttribute]: | |||||
def standard_information(self) -> typing.Optional[StandardInformation]: | ||||||
try: | ||||||
attr = self.attribute(ATTR_TYPE.STANDARD_INFORMATION) | ||||||
if attr is None: | ||||||
return None | ||||||
return StandardInformation(attr.value(), 0, self) | ||||||
except AttributeError: | ||||||
return None | ||||||
|
@@ -1395,7 +1412,7 @@ def record_generator(self, start_at=0) -> typing.Iterator[MFTRecord]: | |||||
logging.debug("Yielding record %d", count) | ||||||
yield record | ||||||
|
||||||
def mft_get_record_buf(self, number): | ||||||
def mft_get_record_buf(self, number: int) -> array.array: | ||||||
if self.filetype == "indx": | ||||||
return array.array("B", "") | ||||||
if self.filetype == "mft": | ||||||
|
@@ -1410,8 +1427,11 @@ def mft_get_record_buf(self, number): | |||||
f.seek(self.mftoffset) | ||||||
f.seek(number * 1024, 1) | ||||||
return array.array("B", f.read(1024)) | ||||||
raise ValueError( | ||||||
"Retrieval method not defined for self.filetype = %s." % self.filetype | ||||||
) | ||||||
|
||||||
def mft_get_record(self, number): | ||||||
def mft_get_record(self, number: int) -> MFTRecord: | ||||||
buf = self.mft_get_record_buf(number) | ||||||
if buf == array.array("B", ""): | ||||||
raise InvalidMFTRecordNumber(number) | ||||||
|
@@ -1426,7 +1446,11 @@ def mft_get_record(self, number): | |||||
+ str(r.mft_record_number()) | ||||||
+ str(r.flags()), | ||||||
) | ||||||
def mft_record_build_path(self, record, cycledetector=None): | ||||||
def mft_record_build_path( | ||||||
self, | ||||||
record: MFTRecord, | ||||||
cycledetector: typing.Optional[typing.Dict[int, bool]] = None, | ||||||
) -> str: | ||||||
if cycledetector is None: | ||||||
cycledetector = {} | ||||||
rec_num = record.mft_record_number() & 0xFFFFFFFFFFFF | ||||||
|
@@ -1454,7 +1478,7 @@ def mft_record_build_path(self, record, cycledetector=None): | |||||
cycledetector[rec_num] = True | ||||||
return self.mft_record_build_path(parent, cycledetector) + "\\" + fn.filename() | ||||||
|
||||||
def mft_get_record_by_path(self, path): | ||||||
def mft_get_record_by_path(self, path) -> typing.Optional[MFTRecord]: | ||||||
# TODO could optimize here by trying to use INDX buffers | ||||||
# and actually walk through the FS | ||||||
count = -1 | ||||||
|
@@ -1468,9 +1492,9 @@ def mft_get_record_by_path(self, path): | |||||
if record_path.lower() != path.lower(): | ||||||
continue | ||||||
return record | ||||||
return False | ||||||
return None | ||||||
|
||||||
def read(self, offset, length): | ||||||
def read(self, offset, length) -> array.array: | ||||||
if self.filetype == "image": | ||||||
with open(self.filename, "rb") as f: | ||||||
f.seek(offset) | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the type hints are nice additions. thanks!