From 16d3af77032cc51fad55d57ed58c4a29cd5a5993 Mon Sep 17 00:00:00 2001 From: lingeringwillx <111698406+lingeringwillx@users.noreply.github.com> Date: Mon, 11 Mar 2024 16:42:34 +0300 Subject: [PATCH] Version 1.3.7 --- README.md | 80 ++--------------- pyproject.toml | 2 +- structio.py | 238 +++++++++++++++++++++++++++++-------------------- test.py | 139 +++-------------------------- 4 files changed, 160 insertions(+), 299 deletions(-) diff --git a/README.md b/README.md index 582945d..6a1eebd 100644 --- a/README.md +++ b/README.md @@ -136,7 +136,7 @@ File-like object stored in memory. Extends *io.BytesIO* from the standard librar ### Attributes -**buffer**: the current content of the object (read only). +**buffer**: the current content of the object. **endian**: the default endian that would be used by the object. @@ -166,14 +166,6 @@ Return True if the end of the stream has been reached. Creates a copy of the object and returns it. -**read_all()** - -Reads and returns all of the content of the object. - -**write_all(buffer)** - -Overwrites the entire object with bytes object *buffer*. - **clear()** Clear the internal buffer of the object. @@ -182,9 +174,9 @@ Clear the internal buffer of the object. Appends bytes object *b* to the object at the current location. -**overwrite(self, start, end, b)** +**overwrite(self, length, b)** -Replaces the bytes between positions *start* and *end* with *b*. +Overwrites *length* bytes at the current position with *b*. **delete(length)** @@ -304,11 +296,11 @@ Deletes the existing Pascal string at the current position and writes *string* a **skip_pstr(numbytes, endian=None)** -Skips the null-terminated string at the current position. +Skips the Pascal string at the current position. **delete_pstr(numbytes, endian=None)** -Deletes the null-terminated string at the current position. +Deletes the Pascal string at the current position. **read_7bint()** @@ -332,64 +324,4 @@ Skips the 7 bit integer at the current position. **delete_7bint()** -Deletes the 7 bit integer at the current position. - ------ - -### Extending StructIO - -You can add your own types by inheriting from the base Struct object: - -```python -from structio import Struct, StructIO - -class ExtendedStruct(Struct): - def _get_7bstr_len(self, b, start=0): - str_len, int_len = self.unpack_7bint(b, start) - return int_len + str_len - - def unpack_7bstr(self, b, start=0): - str_len, int_len = self.unpack_7bint(b, start) - string = self.unpack_str(b[(start + int_len):(start + int_len + str_len)]) - return string, int_len + str_len - - def pack_7bstr(self, string): - b = self.pack_str(string) - return self.pack_7bint(len(b)) + b -``` - -As well as the stream object: - -```python -class ExtendedStructIO(StructIO): - def __init__(self, b=b'', endian='little'): - super().__init__(b) - self._struct = ExtendedStruct(endian) - - def copy(self): - return ExtendedStructIO(self.getvalue(), self._struct.endian) - - def _get_7bstr_len(self): - return self._struct._get_7bstr_len(self.getvalue(), start=self.tell()) - - def read_7bstr(self): - value, length = self._struct.unpack_7bstr(self.getvalue(), start=self.tell()) - self.seek(length, 1) - return value - - def write_7bstr(self, string): - return self.write(self._struct.pack_7bstr(string)) - - def append_7bstr(self, string): - return self.append(self._struct.pack_7bstr(string)) - - def overwrite_7bstr(self, string): - start = self.tell() - return self.overwrite(start, start + self._get_7bstr_len(), self._struct.pack_7bstr(string)) - - def skip_7bstr(self): - return self.seek(self._get_7bstr_len(), 1) - - def delete_7bstr(self): - return self.delete(self._get_7bstr_len()) -``` +Deletes the 7 bit integer at the current position. \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 3d302c7..0596d2d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "structio" -version = "1.3.6" +version = "1.3.7" description = "A Library for unparsing, parsing, and editing binary files" readme = "README.md" requires-python = ">=3.2" diff --git a/structio.py b/structio.py index 4ed792b..dd65122 100644 --- a/structio.py +++ b/structio.py @@ -1,10 +1,10 @@ import io import struct +_endians = {'big': '>', 'little': '<'} +_float_formats = {2: 'e', 4: 'f', 8: 'd'} + class Struct: - _endians = {'big': '>', 'little': '<'} - _float_formats = {2: 'e', 4: 'f', 8: 'd'} - def __init__(self, endian='little', encoding='utf-8', errors='ignore'): self.endian = endian self.encoding = encoding @@ -50,13 +50,13 @@ def pack_int(self, number, numbytes, endian=None, signed=False): return number.to_bytes(numbytes, self._get_endian(endian), signed=signed) def _get_format(self, numbytes, endian=None): - if numbytes not in self._float_formats: + if numbytes not in _float_formats: raise ValueError("float size '{}' not supported".format(numbytes)) - if endian not in self._endians: + if endian not in _endians: raise ValueError("endian '{}' is not recognized".format(endian)) - return self._endians[endian] + self._float_formats[numbytes] + return _endians[endian] + _float_formats[numbytes] def unpack_float(self, b, numbytes, endian=None): return struct.unpack(self._get_format(numbytes, self._get_endian(endian)), b)[0] @@ -125,7 +125,7 @@ def pack_7bint(self, number): b = b'' while number > 127: - b += self.pack_int(number & 0b01111111 | 0b10000000, 1) + b += self.pack_int(number & 0b01111111 | 0b10000000, 1) number >>= 7 b += self.pack_int(number, 1) @@ -134,42 +134,40 @@ def pack_7bint(self, number): class StructIO(io.BytesIO): def __init__(self, b=b'', endian='little', encoding='utf-8', errors='ignore'): super().__init__(b) - self._struct = Struct(endian, encoding, errors) + self.endian = endian + self.encoding = encoding + self.errors = errors @property def buffer(self): return self.getvalue() - @property - def endian(self): - return self._struct.endian - - @endian.setter - def endian(self, value): - self._struct.endian = value - - @property - def encoding(self): - return self._struct.encoding - - @encoding.setter - def encoding(self, value): - self._struct.encoding = value - - @property - def errors(self): - return self._struct.errors - - @errors.setter - def errors(self, value): - self._struct.errors = value + @buffer.setter + def buffer(self, b): + position = self.tell() + self.seek(0) + self.write(b) + self.truncate() + if position < len(b): + self.seek(position) + def __len__(self): - return len(self.getvalue()) + position = self.tell() + self.seek(0, 2) + length = self.tell() + self.seek(position) + return length def __eq__(self, other): return self.getvalue() == other.getvalue() + def _get_endian(self, endian): + if endian is None: + return self.endian + else: + return endian + def is_eof(self): if self.read(1) == b'': return True @@ -178,45 +176,34 @@ def is_eof(self): return False def copy(self): - return StructIO(self.getvalue(), self._struct.endian, self._struct.encoding, self._struct.errors) - - def read_all(self): - self.seek(0) - return self.getvalue() - - def write_all(self, buffer): - self.seek(0) - length = self.write(buffer) - self.truncate() - self.seek(0) - return length + return StructIO(self.getvalue(), self.endian, self.encoding, self.errors) def clear(self): self.seek(0) self.truncate() def append(self, b): - current_position = self.tell() - return self.overwrite(current_position, current_position, b) + return self.overwrite(0, b) - def overwrite(self, start, end, b): - self.seek(end) + def overwrite(self, length, b): + position = self.tell() + self.seek(length, 1) buffer = self.read() - self.seek(start) + self.seek(position) length = self.write(b) self.write(buffer) self.truncate() - self.seek(start + length) + self.seek(position + length) return length def delete(self, length): start = self.tell() - object_length = len(self) + buf_length = len(self) - if start + length > object_length: - length = object_length - start + if start + length > buf_length: + length = buf_length - start - self.overwrite(start, start + length , b'') + self.overwrite(length , b'') return length def find(self, bytes_sequence, n=1): @@ -241,71 +228,105 @@ def index(self, bytes_sequence, n=1): return location def read_bool(self): - return self._struct.unpack_bool(self.read(1)) + return self.read(1) != b'\x00' + def _pack_bool(self, boolean): + if boolean: + return b'\x01' + else: + return b'\x00' + def write_bool(self, boolean): - return self.write(self._struct.pack_bool(boolean)) + return self.write(self._pack_bool(boolean)) def append_bool(self, boolean): - return self.append(self._struct.pack_bool(boolean)) + return self.append(self._pack_bool(boolean)) def read_bits(self): - return self._struct.unpack_bits(self.read(1)) + number = self.read_int(1) + return [number >> i & 1 for i in range(8)] + + def _pack_bits(self, bits): + return self._pack_int(sum(bits[i] << i for i in range(8)), 1) def write_bits(self, bits): - return self.write(self._struct.pack_bits(bits)) + return self.write(self._pack_bits(bits)) def append_bits(self, bits): - return self.append(self._struct.pack_bits(bits)) + return self.append(self._pack_bits(bits)) def read_int(self, numbytes, endian=None, signed=False): - return self._struct.unpack_int(self.read(numbytes), endian, signed) + return int.from_bytes(self.read(numbytes), self._get_endian(endian), signed=signed) + + def _pack_int(self, number, numbytes, endian=None, signed=False): + return number.to_bytes(numbytes, self._get_endian(endian), signed=signed) def write_int(self, number, numbytes, endian=None, signed=False): - return self.write(self._struct.pack_int(number, numbytes, endian, signed)) + return self.write(self._pack_int(number, numbytes, endian, signed)) def append_int(self, number, numbytes, endian=None, signed=False): - return self.append(self._struct.pack_int(number, numbytes, endian, signed)) + return self.append(self._pack_int(number, numbytes, endian, signed)) + + def _get_format(self, numbytes, endian=None): + if numbytes not in _float_formats: + raise ValueError("float size '{}' not supported".format(numbytes)) + + if endian not in _endians: + raise ValueError("endian '{}' is not recognized".format(endian)) + + return _endians[endian] + _float_formats[numbytes] def read_float(self, numbytes, endian=None): - return self._struct.unpack_float(self.read(numbytes), numbytes, endian) + return struct.unpack(self._get_format(numbytes, self._get_endian(endian)), self.read(numbytes))[0] + + def _pack_float(self, number, numbytes, endian=None): + return struct.pack(self._get_format(numbytes, self._get_endian(endian)), number) def write_float(self, number, numbytes, endian=None): - return self.write(self._struct.pack_float(number, numbytes, endian)) + return self.write(self._pack_float(number, numbytes, endian)) def append_float(self, number, numbytes, endian=None): - return self.append(self._struct.pack_float(number, numbytes, endian)) + return self.append(self._pack_float(number, numbytes, endian)) def read_str(self, length): - return self._struct.unpack_str(self.read(length)) + return self.read(length).decode(self.encoding, errors=self.errors) + + def _pack_str(self, string): + return string.encode(self.encoding, errors=self.errors) def write_str(self, string): - return self.write(self._struct.pack_str(string)) + return self.write(self._pack_str(string)) def append_str(self, string): - return self.append(self._struct.pack_str(string)) + return self.append(self._pack_str(string)) def overwrite_str(self, string, length): - start = self.tell() - return self.overwrite(start, start + length, self._struct.pack_str(string)) + return self.overwrite(length, self._pack_str(string)) def _get_cstr_len(self): - return self._struct._get_cstr_len(self.getvalue(), start=self.tell()) + end = self.find(b'\x00') + + if end == -1: + raise ValueError('null termination not found') + + return end - self.tell() + 1 def read_cstr(self): - value, length = self._struct.unpack_cstr(self.getvalue(), start=self.tell()) - self.seek(length, 1) - return value + string = self.read_str(self._get_cstr_len() - 1) + self.seek(1, 1) + return string + + def _pack_cstr(self, string): + return self._pack_str(string) + b'\x00' def write_cstr(self, string): - return self.write(self._struct.pack_cstr(string)) + return self.write(self._pack_cstr(string)) def append_cstr(self, string): - return self.append(self._struct.pack_cstr(string)) + return self.append(self._pack_cstr(string)) def overwrite_cstr(self, string): - start = self.tell() - return self.overwrite(start, start + self._get_cstr_len(), self._struct.pack_cstr(string)) + return self.overwrite(self._get_cstr_len(), self._pack_cstr(string)) def skip_cstr(self): return self.seek(self._get_cstr_len(), 1) @@ -314,22 +335,25 @@ def delete_cstr(self): return self.delete(self._get_cstr_len()) def _get_pstr_len(self, numbytes, endian=None): - return self._struct._get_pstr_len(self.getvalue(), numbytes, endian, start=self.tell()) + length = self.read_int(numbytes, endian) + self.seek(-numbytes, 1) + return numbytes + length def read_pstr(self, numbytes, endian=None): - value, length = self._struct.unpack_pstr(self.getvalue(), numbytes, endian, start=self.tell()) - self.seek(length, 1) - return value + return self.read_str(self.read_int(numbytes, endian)) + + def _pack_pstr(self, string, numbytes, endian=None): + b = self._pack_str(string) + return self._pack_int(len(b), numbytes, endian) + b def write_pstr(self, string, numbytes, endian=None): - return self.write(self._struct.pack_pstr(string, numbytes, endian)) + return self.write(self._pack_pstr(string, numbytes, endian)) def append_pstr(self, string, numbytes, endian=None): - return self.append(self._struct.pack_pstr(string, numbytes, endian)) + return self.append(self._pack_pstr(string, numbytes, endian)) def overwrite_pstr(self, string, numbytes, endian=None): - start = self.tell() - return self.overwrite(start, start + self._get_pstr_len(numbytes, endian), self._struct.pack_pstr(string, numbytes, endian)) + return self.overwrite(self._get_pstr_len(numbytes, endian), self._pack_pstr(string, numbytes, endian)) def skip_pstr(self, numbytes, endian=None): return self.seek(self._get_pstr_len(numbytes, endian), 1) @@ -338,22 +362,46 @@ def delete_pstr(self, numbytes, endian=None): return self.delete(self._get_pstr_len(numbytes, endian)) def _get_7bint_len(self): - return self._struct._get_7bint_len(self.getvalue(), start=self.tell()) + i = 0 + while self.read_int(1) > 127: + i += 1 + + self.seek(- i - 1, 1) + return i + 1 def read_7bint(self): - value, length = self._struct.unpack_7bint(self.getvalue(), start=self.tell()) - self.seek(length, 1) - return value + number = 0 + i = 0 + + byte = self.read_int(1) + while byte > 127: + number += (byte & 0b01111111) << (7 * i) + i += 1 + + byte = self.read_int(1) + + number += byte << (7 * i) + + return number + + def _pack_7bint(self, number): + b = b'' + + while number > 127: + b += self._pack_int(number & 0b01111111 | 0b10000000, 1) + number >>= 7 + + b += self._pack_int(number, 1) + return b def write_7bint(self, number): - return self.write(self._struct.pack_7bint(number)) + return self.write(self._pack_7bint(number)) def append_7bint(self, number): - return self.append(self._struct.pack_7bint(number)) + return self.append(self._pack_7bint(number)) def overwrite_7bint(self, number): - start = self.tell() - return self.overwrite(start, start + self._get_7bint_len(), self._struct.pack_7bint(number)) + return self.overwrite(self._get_7bint_len(), self._pack_7bint(number)) def skip_7bint(self): return self.seek(self._get_7bint_len(), 1) diff --git a/test.py b/test.py index e1f6a9a..69f20e5 100644 --- a/test.py +++ b/test.py @@ -1,52 +1,6 @@ import unittest from structio import * -class ExtendedStruct(Struct): - def _get_7bstr_len(self, b, start=0): - str_len, int_len = self.unpack_7bint(b, start) - return int_len + str_len - - def unpack_7bstr(self, b, start=0): - str_len, int_len = self.unpack_7bint(b, start) - string = self.unpack_str(b[(start + int_len):(start + int_len + str_len)]) - return string, int_len + str_len - - def pack_7bstr(self, string): - b = self.pack_str(string) - return self.pack_7bint(len(b)) + b - -class ExtendedStructIO(StructIO): - def __init__(self, b=b'', endian='little'): - super().__init__(b) - self._struct = ExtendedStruct(endian) - - def copy(self): - return ExtendedStructIO(self.getvalue(), self._struct.endian) - - def _get_7bstr_len(self): - return self._struct._get_7bstr_len(self.getvalue(), start=self.tell()) - - def read_7bstr(self): - value, length = self._struct.unpack_7bstr(self.getvalue(), start=self.tell()) - self.seek(length, 1) - return value - - def write_7bstr(self, string): - return self.write(self._struct.pack_7bstr(string)) - - def append_7bstr(self, string): - return self.append(self._struct.pack_7bstr(string)) - - def overwrite_7bstr(self, string): - start = self.tell() - return self.overwrite(start, start + self._get_7bstr_len(), self._struct.pack_7bstr(string)) - - def skip_7bstr(self): - return self.seek(self._get_7bstr_len(), 1) - - def delete_7bstr(self): - return self.delete(self._get_7bstr_len()) - class PackUnpackFunctionsTest(unittest.TestCase): def testbool(self): struct = Struct() @@ -126,21 +80,17 @@ def testexample(self): stream.read_pstr(1) class GenericStreamMethodsTest(unittest.TestCase): - def testgettersetter(self): + def testbufgettersetter(self): stream = StructIO() self.assertEqual(stream.buffer, stream.getvalue()) - self.assertEqual(stream.endian, stream._struct.endian) - stream.endian = 'big' - self.assertEqual(stream.endian, stream._struct.endian) - - self.assertEqual(stream.encoding, stream._struct.encoding) - stream.encoding = 'ascii' - self.assertEqual(stream.encoding, stream._struct.encoding) + stream.seek(4) + stream.buffer = b'Unit Test' + self.assertEqual(b'Unit Test', stream.getvalue()) - self.assertEqual(stream.errors, stream._struct.errors) - stream.errors = 'strict' - self.assertEqual(stream.errors, stream._struct.errors) + stream.seek(10) + stream.buffer = b'Unit Test' + self.assertEqual(9, stream.tell()) def testlen(self): stream = StructIO(b'Unit Test') @@ -189,23 +139,12 @@ def testappend(self): def testoverwrite(self): stream = StructIO() stream.write(b'Unit Test') - stream.overwrite(0, 4, b'New') + stream.seek(0) + stream.overwrite(4, b'New') + self.assertEqual(3, stream.tell()) stream.seek(0) self.assertEqual(b'New Test', stream.read()) - def testreadall(self): - stream = StructIO() - stream.write(b'Unit Test') - self.assertEqual(b'Unit Test', stream.read_all()) - self.assertEqual(0, stream.tell()) #should be back to start - - def testwriteall(self): - stream = StructIO(b'Unit Test') - stream.seek(0, 2) - stream.write_all(b'New') - self.assertEqual(0, stream.tell()) - self.assertEqual(b'New', stream.read()) - def testdelete(self): stream = StructIO(b'Unit Test') stream.seek(4) @@ -646,62 +585,4 @@ def testdelete7bint(self): stream.delete_7bint() self.assertEqual(127, stream.read_7bint()) -class InheritanceTest(unittest.TestCase): - def testattraccess(self): - struct = ExtendedStruct() - struct.endian - - stream = ExtendedStructIO() - stream.encoding - - def testcopy(self): - stream = ExtendedStructIO() - stream2 = stream.copy() - - self.assertEqual(stream.getvalue(), stream2.getvalue()) - self.assertEqual(stream.endian, stream2.endian) - - def testpackunpack7bstr(self): - struct = ExtendedStruct() - self.assertEqual('Unit Test', struct.unpack_7bstr(struct.pack_7bstr('Unit Test'))[0]) - - def testreadwrite7bstr(self): - stream = ExtendedStructIO() - stream.write_7bstr('Unit Test') - stream.seek(0) - self.assertEqual('Unit Test', stream.read_7bstr()) - self.assertEqual(10, stream.tell()) - - def testappend7bstr(self): - stream = ExtendedStructIO() - stream.write_7bstr('Test') - stream.seek(0) - stream.append_7bstr('Unit') - stream.seek(0) - self.assertEqual('Unit', stream.read_7bstr()) - - def testoverwrite7bstr(self): - stream = ExtendedStructIO() - stream.write_7bstr('Unit Test') - stream.seek(0) - stream.overwrite_7bstr('Working') - stream.seek(0) - self.assertEqual('Working', stream.read_7bstr()) - - def testskip7bstr(self): - stream = ExtendedStructIO() - stream.write_7bstr('Unit') - stream.write_7bstr('Test') - stream.seek(0) - stream.skip_7bstr() - self.assertEqual('Test', stream.read_7bstr()) - - def testdelete7bstr(self): - stream = ExtendedStructIO() - stream.write_7bstr('Unit') - stream.write_7bstr('Test') - stream.seek(0) - stream.delete_7bstr() - self.assertEqual('Test', stream.read_7bstr()) - unittest.main() \ No newline at end of file