Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exercise MFTINDX.py with test image and extracted MFT #83

Merged
84 changes: 41 additions & 43 deletions indxparse/BinaryParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import struct
import sys
import types
from collections.abc import MutableSequence
from datetime import datetime
from typing import Dict, List, Union

Expand Down Expand Up @@ -161,7 +162,7 @@ def __call__(self, *args, **kwargs):
except KeyError:
# We have an entry not in the cache
self.misses += 1
func = types.MethodType(self.func, self.obj, self.name)
func = types.MethodType(self.func, self.obj)
value = func(*args, **kwargs)
lru = self.mru.newer # Always true
# If we haven't reached capacity
Expand Down Expand Up @@ -229,7 +230,7 @@ def align(offset, alignment):
return offset + (alignment - (offset % alignment))


def dosdate(dosdate: array.array, dostime: array.array) -> datetime:
def dosdate(dosdate: MutableSequence[int], dostime: MutableSequence[int]) -> datetime:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would really like to work towards annotations that better differentiate between mutable vs immutable data. when i see a function that takes a MutableSequence then i expect it may change the data. for these purely functional parsing routines i think this is the wrong hint. most of these routines should take bytes (or Sequence[byte] or similar abstraction). maybe we should introduce casts in a few key places to make the hints work better.

thoughts?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think collections.abc.Buffer (3.12+) and typing.ByteString (3.9-) are probably the appropriate hints.

https://docs.python.org/3/library/typing.html#typing.ByteString

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been finding this to be a challenging type annotation exercise. I am happy to explore options. I'll be back to you soon, but probably not before your end of day.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: I've added two patches. The first fixes the Python 3.8 issue. If you'd like me to rework the series so CI passes on each commit, let me know and I'll file another PR instead of force-pushing to overwrite.

The second patch applies your suggestion to differentiate mutable vs. immutable data. It induced some non-trivial updates to the code; that patch's log message document what happened.

I also think the patch ended up suggesting some code-style revisions, e.g. reviewing whether the read_* functions still need the offset parameter with bytes() used before calling; and, reviewing read_byte. But I think these are style matters and don't necessarily need to be addressed now, so I'm marking this PR as ready for review & merge if you agree and are fine with the half-passing early commits.

"""
`dosdate`: 2 bytes, little endian.
`dostime`: 2 bytes, little endian.
Expand Down Expand Up @@ -323,7 +324,7 @@ def __str__(self):


def read_byte(
buf: array.array,
buf: MutableSequence[int],
offset: int,
) -> int:
"""
Expand All @@ -335,13 +336,13 @@ def read_byte(
- `OverrunBufferException`
"""
try:
return struct.unpack_from("<B", buf, offset)[0]
return struct.unpack_from("<B", bytes(buf), offset)[0]
Copy link
Owner

@williballenthin williballenthin Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these calls to bytes(buf) make mypy happy but they aren't side effect free: i think they'll realize the mmap/array/whatever into a copy of all the data. for large byte arrays, this could be problematic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would you feel about a slice on buf before feeding into bytes()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inlining slicing ended up making the most sense to me.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm still concerned about these calls to bytes since they may make copies of a large buffer protocol object (like a mmap) unnecessarily.

i understand these calls are to satisfy mypy. could we replace them with typing.cast(bytes)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, cast() seems to be the right call here. I've pushed two patches to adopt that strategy.

except struct.error:
raise OverrunBufferException(offset, len(buf))


def read_word(
buf: array.array,
buf: MutableSequence[int],
offset: int,
) -> int:
"""
Expand All @@ -353,13 +354,13 @@ def read_word(
- `OverrunBufferException`
"""
try:
return struct.unpack_from("<H", buf, offset)[0]
return struct.unpack_from("<H", bytes(buf), offset)[0]
except struct.error:
raise OverrunBufferException(offset, len(buf))


def read_dword(
buf: array.array,
buf: MutableSequence[int],
offset: int,
) -> int:
"""
Expand All @@ -371,7 +372,7 @@ def read_dword(
- `OverrunBufferException`
"""
try:
return struct.unpack_from("<I", buf, offset)[0]
return struct.unpack_from("<I", bytes(buf), offset)[0]
except struct.error:
raise OverrunBufferException(offset, len(buf))

Expand All @@ -382,7 +383,7 @@ class Block(object):
A block is associated with a offset into a byte-string.
"""

def __init__(self, buf: array.array, offset: int) -> None:
def __init__(self, buf: MutableSequence[int], offset: int) -> None:
"""
Constructor.
Arguments:
Expand Down Expand Up @@ -687,7 +688,7 @@ def unpack_int8(self, offset: int) -> int:
"""
o = self._offset + offset
try:
return struct.unpack_from("<b", self._buf, o)[0]
return struct.unpack_from("<b", bytes(self._buf), o)[0]
except struct.error:
raise OverrunBufferException(o, len(self._buf))

Expand All @@ -713,7 +714,7 @@ def unpack_word_be(self, offset: int) -> int:
"""
o = self._offset + offset
try:
return struct.unpack_from(">H", self._buf, o)[0]
return struct.unpack_from(">H", bytes(self._buf), o)[0]
except struct.error:
raise OverrunBufferException(o, len(self._buf))

Expand All @@ -728,7 +729,7 @@ def unpack_int16(self, offset: int) -> int:
"""
o = self._offset + offset
try:
return struct.unpack_from("<h", self._buf, o)[0]
return struct.unpack_from("<h", bytes(self._buf), o)[0]
except struct.error:
raise OverrunBufferException(o, len(self._buf))

Expand Down Expand Up @@ -762,7 +763,7 @@ def unpack_dword_be(self, offset: int) -> int:
"""
o = self._offset + offset
try:
return struct.unpack_from(">I", self._buf, o)[0]
return struct.unpack_from(">I", bytes(self._buf), o)[0]
except struct.error:
raise OverrunBufferException(o, len(self._buf))

Expand All @@ -777,7 +778,7 @@ def unpack_int32(self, offset: int) -> int:
"""
o = self._offset + offset
try:
return struct.unpack_from("<i", self._buf, o)[0]
return struct.unpack_from("<i", bytes(self._buf), o)[0]
except struct.error:
raise OverrunBufferException(o, len(self._buf))

Expand All @@ -791,7 +792,7 @@ def unpack_qword(self, offset: int) -> int:
"""
o = self._offset + offset
try:
return struct.unpack_from("<Q", self._buf, o)[0]
return struct.unpack_from("<Q", bytes(self._buf), o)[0]
except struct.error:
raise OverrunBufferException(o, len(self._buf))

Expand All @@ -806,7 +807,7 @@ def unpack_int64(self, offset: int) -> int:
"""
o = self._offset + offset
try:
return struct.unpack_from("<q", self._buf, o)[0]
return struct.unpack_from("<q", bytes(self._buf), o)[0]
except struct.error:
raise OverrunBufferException(o, len(self._buf))

Expand All @@ -821,7 +822,7 @@ def unpack_float(self, offset: int) -> float:
"""
o = self._offset + offset
try:
return struct.unpack_from("<f", self._buf, o)[0]
return struct.unpack_from("<f", bytes(self._buf), o)[0]
except struct.error:
raise OverrunBufferException(o, len(self._buf))

Expand All @@ -836,7 +837,7 @@ def unpack_double(self, offset: int) -> float:
"""
o = self._offset + offset
try:
return struct.unpack_from("<d", self._buf, o)[0]
return struct.unpack_from("<d", bytes(self._buf), o)[0]
except struct.error:
raise OverrunBufferException(o, len(self._buf))

Expand All @@ -855,7 +856,7 @@ def unpack_binary(self, offset: int, length=0) -> array.array:
o = self._offset + offset
try:
return array.array(
"B", struct.unpack_from("<%ds" % (length), self._buf, o)[0]
"B", struct.unpack_from("<%ds" % (length), bytes(self._buf), o)[0]
)
except struct.error:
raise OverrunBufferException(o, len(self._buf))
Expand All @@ -881,11 +882,9 @@ def unpack_wstring(self, offset: int, length: int) -> str:
Throws:
- `UnicodeDecodeError`
"""
return (
return bytes(
self._buf[self._offset + offset : self._offset + offset + 2 * length]
.tobytes()
.decode("utf-16le")
)
).decode("utf-16le")

def unpack_dosdate(self, offset: int) -> datetime:
"""
Expand Down Expand Up @@ -925,7 +924,7 @@ def unpack_systemtime(self, offset: int) -> datetime:
"""
o = self._offset + offset
try:
parts = struct.unpack_from("<WWWWWWWW", self._buf, o)
parts = struct.unpack_from("<WWWWWWWW", bytes(self._buf), o)
except struct.error:
raise OverrunBufferException(o, len(self._buf))
return datetime(
Expand Down Expand Up @@ -954,26 +953,25 @@ def unpack_guid(self, offset: int) -> str:
raise OverrunBufferException(o, len(self._buf))

# Yeah, this is ugly
h = list(map(ord, _bin))
return (
"%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x"
% (
h[3],
h[2],
h[1],
h[0],
h[5],
h[4],
h[7],
h[6],
h[8],
h[9],
h[10],
h[11],
h[12],
h[13],
h[14],
h[15],
_bin[3],
_bin[2],
_bin[1],
_bin[0],
_bin[5],
_bin[4],
_bin[7],
_bin[6],
_bin[8],
_bin[9],
_bin[10],
_bin[11],
_bin[12],
_bin[13],
_bin[14],
_bin[15],
)
)

Expand Down Expand Up @@ -1003,14 +1001,14 @@ class Nestable(object):

def __init__(
self,
buf: array.array,
buf: MutableSequence[int],
offset: int,
) -> None:
super(Nestable, self).__init__()

@staticmethod
def structure_size(
buf: array.array,
buf: MutableSequence[int],
offset: int,
parent,
) -> int:
Expand Down
Loading
Loading