-
Notifications
You must be signed in to change notification settings - Fork 0
/
sem.py
executable file
·226 lines (205 loc) · 7.04 KB
/
sem.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
#!/usr/bin/env python3
import logging
import multiprocessing
import os
import secrets
import time
from argparse import ArgumentParser, Namespace
from itertools import count
from functools import partial
from unicorn import UC_SECOND_SCALE
from sem.emulation import (
DefaultRandomizer,
EmulationContext,
)
from sem.fuzzing import Experiment, ProgramProvider, RunStatus
logging.root.setLevel(logging.INFO)
log = logging.Logger(__name__, logging.INFO)
logging.Logger("pwnlib.asm").propagate = False
def all_subclasses(cls):
return set(cls.__subclasses__()).union(
[s for c in cls.__subclasses__() for s in all_subclasses(c)]
)
def parse_args() -> Namespace:
"""Parse command-line arguments and error-check."""
parser = ArgumentParser(description="Compare assembly semantics through emulation")
parser.add_argument("-a", "--arch", default="x86", help="Architecture (e.g. x86)")
parser.add_argument("-m", "--mode", default="64", help="Emulation mode (e.g. 64)")
parser.add_argument(
"-c",
"--count",
type=int,
default=10,
help="Fuzz a generated program/function COUNT times",
)
parser.add_argument(
"-e",
"--experiments",
type=int,
default=1,
help="Number of parallel experiments (0 to use CPU count)",
)
parser.add_argument(
"-M",
"--max-programs",
type=int,
default=0,
help="Max generated programs per experiment (default: unlimited)",
)
parser.add_argument(
"-s", "--seed", type=int, default=secrets.randbits(64), help="Initial seed"
)
parser.add_argument(
"-o", "--outdir", default="/dev/shm/sempy", help="Experiment output root"
)
parser.add_argument(
"-t", "--timeout", type=int, default=0, help="Experiment timeout (seconds)"
)
parser.add_argument(
"-O",
"--opt-levels",
type=str,
default="",
required=False,
help="Optimization levels to test (e.g. -O0123s)",
)
parser.add_argument(
"-p",
"--provider",
default="mutate-csmith",
choices=[Sub().name for Sub in all_subclasses(ProgramProvider)],
)
parser.add_argument(
"-r",
"--repro",
type=int,
default=None,
help="Reproduce a program seed (--program-seed REPRO --once)",
)
parser.add_argument(
"--program-seed",
type=int,
default=None,
help="Seed for replicating a specific program",
)
parser.add_argument("--once", action="store_true", help="Emulate once and quit")
parser.add_argument("-d", "--debug", action="store_true", help="Show debug output")
parser.add_argument(
"-k", "--keep-data", action="store_true", help="Keep uninteresting seed data"
)
parser.add_argument(
"-R",
"--dump-regs",
action="store_true",
help="Dump register values before and after emulation",
)
parser.add_argument("-q", "--quiet", action="store_true", help="Suppress output")
# -p file only
parser.add_argument(
"-T",
"--types",
type=partial(str.split, sep=","),
default=[],
nargs="?",
help="Function return and argument types (format: /[iufvp]\\d+/) (e.g. i64)",
)
parser.add_argument(
"--fn-name",
type=str,
nargs="?",
help="Function name for file provider",
)
parser.add_argument("files", default=[], nargs="*", help="Files to compare")
args = parser.parse_args()
if args.provider == "file":
if len(args.files) < 2:
parser.error("Expected two or more files to compare")
if len(args.types) < 1:
parser.error("Return type is required!")
if not args.fn_name:
parser.error("Function name is required!")
for arg_type in args.types:
if len(arg_type) < 2:
parser.error(f"Argument type too short: {arg_type}")
if arg_type[0] not in ["i", "u", "f", "v", "p"]:
parser.error(f"Invalid argument type: {arg_type[0]}")
if not str.isnumeric(arg_type[1:]):
parser.error(f"Invalid argument bit size: {arg_type[1:]}")
else:
if len(args.files) or len(args.types):
parser.error(
"Option --types and files are only supported for `file` provider"
)
if len(args.opt_levels) < 2:
parser.error("Expected at least two optimization levels to compare")
if args.repro is not None:
args.program_seed = args.repro
args.once = True
args.outdir = os.path.join(args.outdir, "")
return args
def fuzz(args: Namespace, seed: int):
start_time = time.time()
context = EmulationContext.get(args.arch, args.mode)
provider = next(
Sub() for Sub in all_subclasses(ProgramProvider) if Sub().name == args.provider
)
if provider.name == "file":
provider.set_files(args.files, args.types, args.fn_name)
expr = Experiment(
f"{provider.name} -O{args.opt_levels}",
args.outdir,
seed,
provider,
[*args.opt_levels],
args.count,
context,
DefaultRandomizer(),
int(0.5 * UC_SECOND_SCALE),
args.debug,
args.keep_data,
args.dump_regs,
)
for i in count(start=1):
status, _ = expr.run(args.program_seed)
if not args.quiet:
match status:
case RunStatus.RUN_DIFF:
if args.debug:
print(expr.make_diff_table())
else:
print("Difference found")
case RunStatus.RUN_OK:
print("No difference found")
case RunStatus.RUN_EMU_EXC:
print("Emulation exception")
case RunStatus.RUN_GEN_EXC:
print("Program generation exception")
case RunStatus.RUN_TIMEOUT:
print("Emulation timeout reached")
current_time = time.time()
if args.once or i == args.max_programs:
if status == RunStatus.RUN_DIFF and args.debug:
# Add 10 to differentiate between generic runtime error and RunStatus
exit(10 + RunStatus.RUN_DIFF)
break
# No need for precise timeouts, since each expr.run() finishes within a second
if args.timeout != 0 and current_time - start_time > args.timeout:
break
def main():
args = parse_args()
processes = []
rand = DefaultRandomizer(args.seed)
if args.experiments == 0:
args.experiments = multiprocessing.cpu_count()
elif args.experiments == 1:
fuzz(args, rand.get())
return
for _ in range(args.experiments):
process = multiprocessing.Process(target=fuzz, args=(args, rand.get()))
time.sleep(0.1) # suppress pwnlib term init error
processes.append(process)
process.start()
for process in processes:
process.join()
if __name__ == "__main__":
main()