Skip to content

Commit

Permalink
Merge pull request #8 from Samsung/mutation_fuzzer
Browse files Browse the repository at this point in the history
Added mutations to fuzzer
  • Loading branch information
jakub-botwicz authored Aug 5, 2021
2 parents 636fd3f + c80194e commit 009291b
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 11 deletions.
128 changes: 117 additions & 11 deletions cotopaxi/protocol_fuzzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#

import os
import random
import struct
import sys
import time
Expand Down Expand Up @@ -181,18 +182,10 @@ def perform_protocol_fuzzing(test_params, test_cases):
print(prepare_separator("-"))


def load_corpus(tester, args):
def load_corpus(tester, corpus_dir_path):
"""Provide corpus of payloads based on options provided by args."""
options = tester.parse_args(args)
test_params = tester.test_params

print_verbose(test_params, "corpus_dir: {}".format(options.corpus_dir))
if options.corpus_dir:
corpus_dir_path = options.corpus_dir
else:
corpus_dir_path = os.path.dirname(__file__) + "/fuzzing_corpus/"
if test_params.protocol.name != "ALL":
corpus_dir_path += test_params.protocol.name.lower()
test_params = tester.test_params
print_verbose(test_params, prepare_separator())
testcases = []
for root, _, files in os.walk(corpus_dir_path):
Expand All @@ -203,10 +196,91 @@ def load_corpus(tester, args):
"Cannot load testcases from provided path: {}\n"
"Testing stopped!".format(corpus_dir_path)
)
print_verbose(test_params, "corpus_dir: {}".format(corpus_dir_path))
print_verbose(test_params, "Loaded corpus of {} testcases".format(len(testcases)))
return testcases


def mutate_testcase(filename, content, mutation_nr, mutation_index, mutation_data):
"""Mutate single testcase."""

mutation_nr = mutation_nr % 5
if len(content) == 0:
content = bytes(
mutation_data,
)
mutation_index = mutation_index % len(content)
mutation_data = mutation_data % 255
filename_split = filename.split(".")
# 0 - remove single byte (index given by mutation_index)
# 1 - cut content after mutation index
# 2 - change character at mutation_index to mutation data
# 3 - insert character mutation data at mutation_index
# 4 - duplicate character at mutation_index
if mutation_nr == 0:
mutation_description = "_rem_" + str(mutation_index)
new_content = content[:mutation_index] + content[mutation_index + 1 :]
elif mutation_nr == 1:
mutation_description = "_cut_" + str(mutation_index)
new_content = content[:mutation_index]
elif mutation_nr == 2:
mutation_description = "_flip_" + str(mutation_index)
new_content = (
content[:mutation_index]
+ bytes(
mutation_data,
)
+ content[mutation_index + 1 :]
)
elif mutation_nr == 3:
mutation_description = "_ins_" + str(mutation_index)
new_content = (
content[:mutation_index]
+ bytes(
mutation_data,
)
+ content[mutation_index:]
)
else:
mutation_description = "_dup_" + str(mutation_index)
new_content = (
content[:mutation_index]
+ (content[mutation_index:mutation_index])
+ content[mutation_index:]
)
if len(new_content) == 0:
new_content = bytes(
mutation_data,
)
if len(filename_split) > 1:
filename_split[-1] += mutation_description
else:
filename_split[0] += mutation_description
return ".".join(filename_split), new_content


def mutate_testcases(testcases, new_corpus_dir):
"""Mutate set of testcases."""

if not os.path.exists(new_corpus_dir):
os.makedirs(new_corpus_dir)
for testcase in testcases:
with open(testcase.payload_file, "rb") as file_handle:
testcase_payload = file_handle.read()
_, original_filename = os.path.split(testcase.payload_file)
new_filename, new_content = mutate_testcase(
original_filename,
testcase_payload,
random.randint(0, 5),
random.randint(0, 9223372036854775807),
random.randint(0, 255),
)
if len(new_filename) > 60:
new_filename = new_filename[:20] + "___" + new_filename[40:]
with open(os.path.join(new_corpus_dir, new_filename), "wb") as file:
file.write(new_content)


def main(args):
"""Start protocol fuzzer based on command line parameters."""
tester = CotopaxiTester(
Expand Down Expand Up @@ -234,10 +308,42 @@ def main(args):
" for respawning tested server",
)

testcases = load_corpus(tester, args)
tester.argparser.add_argument(
"--fuzzing-iterations",
"-FI",
action="store",
type=int,
default=0,
help="number fuzzing iterations (mutations of corpus)",
)

options = tester.parse_args(args)
if options.corpus_dir:
corpus_dir_path = options.corpus_dir
corpus_dir_base = options.corpus_dir + "_"
else:
corpus_dir_path = os.path.dirname(__file__) + "/fuzzing_corpus/"
if tester.test_params.protocol.name != "ALL":
corpus_dir_path += tester.test_params.protocol.name.lower()
corpus_dir_base = os.getcwd() + "/fuzzing_corpus_"
testcases = load_corpus(tester, corpus_dir_path)
tester.perform_testing("protocol fuzzing", perform_protocol_fuzzing, testcases)

max_iteration_len = len(str(options.fuzzing_iterations))
for i in range(options.fuzzing_iterations):
print(
prepare_separator(
"#", post_separator_text="\n\t\tStarting round {}\n".format(i + 2)
)
)
str_i = str(i)
corpus_new_dir = (
corpus_dir_base + "0" * (max_iteration_len - len(str_i)) + str_i
)
mutate_testcases(testcases, corpus_new_dir)
testcases = load_corpus(tester, corpus_new_dir)
tester.perform_testing("protocol fuzzing", perform_protocol_fuzzing, testcases)


if __name__ == "__main__":
main(sys.argv[1:])
63 changes: 63 additions & 0 deletions tests/test_protocol_fuzzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,69 @@ def test_protocol_fuzzer_mqtt_pos(self):
# print "\n" + 30 * "-" + "\n" + output + "\n" + 30 * "-" + "\n"
self.assertIn("Fuzzing stopped", output)

"""
print(mutate_testcase("test.txt", "abcd", 0, 0, 88))
print(mutate_testcase("test.txt", "abcd", 0, 1, 88))
print(mutate_testcase("test.txt", "abcd", 0, 2, 88))
print(mutate_testcase("test.txt", "abcd", 0, 3, 88))
print(mutate_testcase("test.txt", "abcd", 0, 4, 88))
print(mutate_testcase("test.txt", "a", 0, 4, 88))
print(mutate_testcase("test", "abcd", 0, 0, 10))
print(mutate_testcase("new.test.txt", "abcd", 0, 0, 10))
print(mutate_testcase("brand.new.test.txt", "abcd", 0, 0, 10))
print(mutate_testcase("test.txt", "abcd", 1, 0, 88))
print(mutate_testcase("test.txt", "abcd", 1, 1, 88))
print(mutate_testcase("test.txt", "abcd", 1, 2, 88))
print(mutate_testcase("test.txt", "abcd", 1, 3, 88))
print(mutate_testcase("test.txt", "abcd", 1, 4, 88))
print(mutate_testcase("test.txt", "abcd", 2, 0, 88))
print(mutate_testcase("test.txt", "abcd", 2, 1, 88))
print(mutate_testcase("test.txt", "abcd", 2, 2, 88))
print(mutate_testcase("test.txt", "abcd", 2, 3, 88))
print(mutate_testcase("test.txt", "abcd", 2, 4, 88))
print(mutate_testcase("test.txt", "abcd", 3, 0, 88))
print(mutate_testcase("test.txt", "abcd", 3, 1, 88))
print(mutate_testcase("test.txt", "abcd", 3, 2, 88))
print(mutate_testcase("test.txt", "abcd", 3, 3, 88))
print(mutate_testcase("test.txt", "abcd", 3, 4, 88))
print(mutate_testcase("test.txt", "abcd", 4, 0, 88))
print(mutate_testcase("test.txt", "abcd", 4, 1, 88))
print(mutate_testcase("test.txt", "abcd", 4, 2, 88))
print(mutate_testcase("test.txt", "abcd", 4, 3, 88))
print(mutate_testcase("test.txt", "abcd", 4, 4, 88))
('test_rem_0.txt', 'bcd')
('test_rem_1.txt', 'acd')
('test_rem_2.txt', 'abd')
('test_rem_3.txt', 'abc')
('test_rem_0.txt', 'bcd')
('test_rem_0.txt', 'X')
('test_rem_0', 'bcd')
('new.test_rem_0.txt', 'bcd')
('brand.new.test_rem_0.txt', 'bcd')
('test_cut_0.txt', 'X')
('test_cut_1.txt', 'a')
('test_cut_2.txt', 'ab')
('test_cut_3.txt', 'abc')
('test_cut_0.txt', 'X')
('test_flip_0.txt', 'Xbcd')
('test_flip_1.txt', 'aXcd')
('test_flip_2.txt', 'abXd')
('test_flip_3.txt', 'abcX')
('test_flip_0.txt', 'Xbcd')
('test_ins_0.txt', 'Xabcd')
('test_ins_1.txt', 'aXbcd')
('test_ins_2.txt', 'abXcd')
('test_ins_3.txt', 'abcXd')
('test_ins_0.txt', 'Xabcd')
('test_dup_0.txt', 'aabcd')
('test_dup_1.txt', 'abbcd')
('test_dup_2.txt', 'abccd')
('test_dup_3.txt', 'abcdd')
('test_dup_0.txt', 'aabcd')
"""

if __name__ == "__main__":
TEST_RUNNER = TimerTestRunner()
Expand Down

0 comments on commit 009291b

Please sign in to comment.