Skip to content

Commit 8ce999f

Browse files
authored
Add LEB128 variable-lengh integer support (#73)
1 parent 772d473 commit 8ce999f

File tree

5 files changed

+304
-0
lines changed

5 files changed

+304
-0
lines changed

dissect/cstruct/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from dissect.cstruct.types.enum import Enum, EnumInstance
1515
from dissect.cstruct.types.flag import Flag, FlagInstance
1616
from dissect.cstruct.types.instance import Instance
17+
from dissect.cstruct.types.leb128 import LEB128
1718
from dissect.cstruct.types.packedtype import PackedType
1819
from dissect.cstruct.types.pointer import Pointer, PointerInstance
1920
from dissect.cstruct.types.structure import Field, Structure, Union
@@ -44,6 +45,7 @@
4445
"Union",
4546
"Field",
4647
"Instance",
48+
"LEB128",
4749
"Structure",
4850
"Expression",
4951
"PackedType",

dissect/cstruct/cstruct.py

+4
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from dissect.cstruct.exceptions import ResolveError
88
from dissect.cstruct.parser import CStyleParser, TokenParser
99
from dissect.cstruct.types import (
10+
LEB128,
1011
Array,
1112
BaseType,
1213
BytesInteger,
@@ -59,6 +60,9 @@ def __init__(self, endian: str = "<", pointer: Optional[str] = None):
5960
"int128": BytesInteger(self, "int128", 16, True, alignment=16),
6061
"uint128": BytesInteger(self, "uint128", 16, False, alignment=16),
6162

63+
"uleb128": LEB128(self, 'uleb128', None, False),
64+
"ileb128": LEB128(self, 'ileb128', None, True),
65+
6266
"void": VoidType(),
6367

6468
# Common C types not covered by internal types

dissect/cstruct/types/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from dissect.cstruct.types.enum import Enum, EnumInstance
55
from dissect.cstruct.types.flag import Flag, FlagInstance
66
from dissect.cstruct.types.instance import Instance
7+
from dissect.cstruct.types.leb128 import LEB128
78
from dissect.cstruct.types.packedtype import PackedType
89
from dissect.cstruct.types.pointer import Pointer, PointerInstance
910
from dissect.cstruct.types.structure import Field, Structure, Union
@@ -21,6 +22,7 @@
2122
"Flag",
2223
"FlagInstance",
2324
"Instance",
25+
"LEB128",
2426
"PackedType",
2527
"Pointer",
2628
"PointerInstance",

dissect/cstruct/types/leb128.py

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
from __future__ import annotations
2+
3+
from typing import TYPE_CHECKING, Any, BinaryIO
4+
5+
from dissect.cstruct.types.base import RawType
6+
7+
if TYPE_CHECKING:
8+
from dissect.cstruct import cstruct
9+
10+
11+
class LEB128(RawType):
12+
"""Variable-length code compression to store an arbitrarily large integer in a small number of bytes.
13+
14+
See https://en.wikipedia.org/wiki/LEB128 for more information and an explanation of the algorithm.
15+
"""
16+
17+
signed: bool
18+
19+
def __init__(self, cstruct: cstruct, name: str, size: int, signed: bool, alignment: int = 1):
20+
self.signed = signed
21+
super().__init__(cstruct, name, size, alignment)
22+
23+
def _read(self, stream: BinaryIO, context: dict[str, Any] = None) -> LEB128:
24+
result = 0
25+
shift = 0
26+
while True:
27+
b = stream.read(1)
28+
if b == b"":
29+
raise EOFError("EOF reached, while final LEB128 byte was not yet read")
30+
31+
b = ord(b)
32+
result |= (b & 0x7F) << shift
33+
shift += 7
34+
if (b & 0x80) == 0:
35+
break
36+
37+
if self.signed:
38+
if b & 0x40 != 0:
39+
result |= ~0 << shift
40+
41+
return result
42+
43+
def _read_0(self, stream: BinaryIO, context: dict[str, Any] = None) -> LEB128:
44+
result = []
45+
46+
while True:
47+
if (value := self._read(stream, context)) == 0:
48+
break
49+
50+
result.append(value)
51+
52+
return result
53+
54+
def _write(self, stream: BinaryIO, data: int) -> int:
55+
# only write negative numbers when in signed mode
56+
if data < 0 and not self.signed:
57+
raise ValueError("Attempt to encode a negative integer using unsigned LEB128 encoding")
58+
59+
result = bytearray()
60+
while True:
61+
# low-order 7 bits of value
62+
byte = data & 0x7F
63+
data = data >> 7
64+
65+
# function works similar for signed- and unsigned integers, except for the check when to stop
66+
# the encoding process.
67+
if (self.signed and (data == 0 and byte & 0x40 == 0) or (data == -1 and byte & 0x40 != 0)) or (
68+
not self.signed and data == 0
69+
):
70+
result.append(byte)
71+
break
72+
73+
# Set high-order bit of byte
74+
result.append(0x80 | byte)
75+
76+
stream.write(result)
77+
return len(result)
78+
79+
def _write_0(self, stream: BinaryIO, data: list[int]) -> int:
80+
return self._write_array(stream, data + [0])
81+
82+
def default(self) -> int:
83+
return 0
84+
85+
def default_array(self, count: int) -> list[int]:
86+
return [0] * count

tests/test_types_leb128.py

+210
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
import io
2+
3+
import pytest
4+
from dissect import cstruct
5+
6+
7+
def test_leb128_unsigned_read_EOF():
8+
cs = cstruct.cstruct()
9+
10+
with pytest.raises(EOFError, match="EOF reached, while final LEB128 byte was not yet read"):
11+
cs.uleb128(b"\x8b")
12+
13+
14+
def test_leb128_unsigned_read():
15+
cs = cstruct.cstruct()
16+
17+
assert cs.uleb128(b"\x02") == 2
18+
assert cs.uleb128(b"\x8b\x25") == 4747
19+
assert cs.uleb128(b"\xc9\x8f\xb0\x06") == 13371337
20+
assert cs.uleb128(b"\x7e") == 126
21+
assert cs.uleb128(b"\xf5\x5a") == 11637
22+
assert cs.uleb128(b"\xde\xd6\xcf\x7c") == 261352286
23+
24+
25+
def test_leb128_signed_read():
26+
cs = cstruct.cstruct()
27+
28+
assert cs.ileb128(b"\x02") == 2
29+
assert cs.ileb128(b"\x8b\x25") == 4747
30+
assert cs.ileb128(b"\xc9\x8f\xb0\x06") == 13371337
31+
assert cs.ileb128(b"\x7e") == -2
32+
assert cs.ileb128(b"\xf5\x5a") == -4747
33+
assert cs.ileb128(b"\xde\xd6\xcf\x7c") == -7083170
34+
35+
36+
@pytest.mark.parametrize("compiled", [True, False])
37+
def test_leb128_struct_unsigned(compiled):
38+
cdef = """
39+
struct test {
40+
uleb128 len;
41+
char data[len];
42+
};
43+
"""
44+
cs = cstruct.cstruct()
45+
cs.load(cdef, compiled=compiled)
46+
47+
buf = b"\xaf\x18"
48+
buf += b"\x41" * 3119
49+
obj = cs.test(buf)
50+
51+
assert obj.len == 3119
52+
assert obj.data == (b"\x41" * 3119)
53+
assert len(obj.data) == 3119
54+
assert len(buf) == 3119 + 2
55+
56+
assert obj.dumps() == buf
57+
58+
59+
@pytest.mark.parametrize("compiled", [True, False])
60+
def test_leb128_struct_unsigned_zero(compiled):
61+
cdef = """
62+
struct test {
63+
uleb128 numbers[];
64+
};
65+
"""
66+
cs = cstruct.cstruct()
67+
cs.load(cdef, compiled=compiled)
68+
69+
buf = b"\xaf\x18\x8b\x25\xc9\x8f\xb0\x06\x00"
70+
obj = cs.test(buf)
71+
72+
assert len(obj.numbers) == 3
73+
assert obj.numbers[0] == 3119
74+
assert obj.numbers[1] == 4747
75+
assert obj.numbers[2] == 13371337
76+
77+
assert obj.dumps() == buf
78+
79+
80+
@pytest.mark.parametrize("compiled", [True, False])
81+
def test_leb128_struct_signed_zero(compiled):
82+
cdef = """
83+
struct test {
84+
ileb128 numbers[];
85+
};
86+
"""
87+
cs = cstruct.cstruct()
88+
cs.load(cdef, compiled=compiled)
89+
90+
buf = b"\xaf\x18\xf5\x5a\xde\xd6\xcf\x7c\x00"
91+
obj = cs.test(buf)
92+
93+
assert len(obj.numbers) == 3
94+
assert obj.numbers[0] == 3119
95+
assert obj.numbers[1] == -4747
96+
assert obj.numbers[2] == -7083170
97+
98+
assert obj.dumps() == buf
99+
100+
101+
@pytest.mark.parametrize("compiled", [True, False])
102+
def test_leb128_nested_struct_unsigned(compiled):
103+
cdef = """
104+
struct entry {
105+
uleb128 len;
106+
char data[len];
107+
uint32 crc;
108+
};
109+
struct nested {
110+
uleb128 name_len;
111+
char name[name_len];
112+
uleb128 n_entries;
113+
entry entries[n_entries];
114+
};
115+
"""
116+
cs = cstruct.cstruct()
117+
cs.load(cdef, compiled=compiled)
118+
119+
# Dummy file format specifying 300 entries
120+
buf = b"\x08\x54\x65\x73\x74\x66\x69\x6c\x65\xac\x02"
121+
122+
# Each entry has 4 byte data + 4 byte CRC
123+
buf += b"\x04\x41\x41\x41\x41\x42\x42\x42\x42" * 300
124+
125+
obj = cs.nested(buf)
126+
127+
assert obj.name_len == 8
128+
assert obj.name == b"\x54\x65\x73\x74\x66\x69\x6c\x65"
129+
assert obj.n_entries == 300
130+
131+
assert obj.dumps() == buf
132+
133+
134+
@pytest.mark.parametrize("compiled", [True, False])
135+
def test_leb128_nested_struct_signed(compiled):
136+
cdef = """
137+
struct entry {
138+
ileb128 len;
139+
char data[len];
140+
uint32 crc;
141+
};
142+
struct nested {
143+
ileb128 name_len;
144+
char name[name_len];
145+
ileb128 n_entries;
146+
entry entries[n_entries];
147+
};
148+
"""
149+
cs = cstruct.cstruct()
150+
cs.load(cdef, compiled=compiled)
151+
152+
# Dummy file format specifying 300 entries
153+
buf = b"\x08\x54\x65\x73\x74\x66\x69\x6c\x65\xac\x02"
154+
155+
# Each entry has 4 byte data + 4 byte CRC
156+
buf += b"\x04\x41\x41\x41\x41\x42\x42\x42\x42" * 300
157+
158+
obj = cs.nested(buf)
159+
160+
assert obj.name_len == 8
161+
assert obj.name == b"\x54\x65\x73\x74\x66\x69\x6c\x65"
162+
assert obj.n_entries == 300
163+
164+
assert obj.dumps() == buf
165+
166+
167+
def test_leb128_unsigned_write():
168+
cs = cstruct.cstruct()
169+
170+
assert cs.uleb128.dumps(2) == b"\x02"
171+
assert cs.uleb128.dumps(4747) == b"\x8b\x25"
172+
assert cs.uleb128.dumps(13371337) == b"\xc9\x8f\xb0\x06"
173+
assert cs.uleb128.dumps(126) == b"\x7e"
174+
assert cs.uleb128.dumps(11637) == b"\xf5\x5a"
175+
assert cs.uleb128.dumps(261352286) == b"\xde\xd6\xcf\x7c"
176+
177+
178+
def test_leb128_signed_write():
179+
cs = cstruct.cstruct()
180+
181+
assert cs.ileb128.dumps(2) == b"\x02"
182+
assert cs.ileb128.dumps(4747) == b"\x8b\x25"
183+
assert cs.ileb128.dumps(13371337) == b"\xc9\x8f\xb0\x06"
184+
assert cs.ileb128.dumps(-2) == b"\x7e"
185+
assert cs.ileb128.dumps(-4747) == b"\xf5\x5a"
186+
assert cs.ileb128.dumps(-7083170) == b"\xde\xd6\xcf\x7c"
187+
188+
189+
def test_leb128_write_negatives():
190+
cs = cstruct.cstruct()
191+
192+
with pytest.raises(ValueError, match="Attempt to encode a negative integer using unsigned LEB128 encoding"):
193+
cs.uleb128.dumps(-2)
194+
assert cs.ileb128.dumps(-2) == b"\x7e"
195+
196+
197+
def test_leb128_unsigned_write_amount_written():
198+
cs = cstruct.cstruct()
199+
200+
out1 = io.BytesIO()
201+
bytes_written1 = cs.uleb128.write(out1, 2)
202+
assert bytes_written1 == out1.tell()
203+
204+
out2 = io.BytesIO()
205+
bytes_written2 = cs.uleb128.write(out2, 4747)
206+
assert bytes_written2 == out2.tell()
207+
208+
out3 = io.BytesIO()
209+
bytes_written3 = cs.uleb128.write(out3, 13371337)
210+
assert bytes_written3 == out3.tell()

0 commit comments

Comments
 (0)