-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathget_duplicate_reports.py
215 lines (176 loc) · 8.67 KB
/
get_duplicate_reports.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
import argparse
import json
import glob
import os
from pathlib import Path
from rich.progress import (
BarColumn,
Progress,
TextColumn,
TimeRemainingColumn,
SpinnerColumn,
TaskProgressColumn,
TimeElapsedColumn,
)
DUPLICATES_DIR = "./duplicate_reports/"
def parse_arguments():
"""
Arguments parsing.
"""
parser = argparse.ArgumentParser(
description="Parses CAPE JSON reports and sanitizes it (deletes unnecessary entnries).")
parser.add_argument("json_dir", help="The directory containing one or more json reports.")
parser.add_argument(
"-d", "--duplicates", help="The strategy for duplicates, e.g. which one will be kept: 'first', 'biggest'.", default="biggest")
parser.add_argument("-s", "--silent", action="store_true", help="Silent mode.")
arguments = parser.parse_args()
return arguments
def search_duplicates(json_files: list, progress: Progress, task: int, silent: bool) -> dict:
"""
Search for duplicate reports in the given list of json files. Duplication is detected using the
SHA512 hash of the binary file as the identifier.
Args:
json_files: List of JSON files to search for duplicates.
progress: rich.progress.Progress object to show the processing status.
task: rich.progress.Task object to update progress.
silent: If True, no output will be printed.
Returns:
A dictionary with the duplicate reports, where the key is the SHA512 hash of the binary file,
and the value is a list of dictionaries, where each dictionary has the path to the report
and the size of the file.
"""
seen_values = {}
duplicate_reports = {}
for json_file in json_files:
with open(json_file) as file:
try:
cape_report = json.load(file)
except Exception as e:
print(f"[!!] ERROR parsing report {json_file}."
f"{e} [!!] Skipping report")
continue
progress.update(task, advance=1) if not silent else None
report_id = cape_report['target']['file']['sha512']
if report_id in seen_values:
size = os.path.getsize(json_file)
if report_id not in duplicate_reports:
duplicate_reports[report_id] = [{json_file: size}]
duplicate_reports[report_id].append(
{seen_values[report_id]: os.path.getsize(seen_values[report_id])})
else:
duplicate_reports[report_id].append({json_file: size})
else:
seen_values[report_id] = json_file
return duplicate_reports
def move_duplicate_reports(duplicate_reports: dict, duplicates_strat: str = 'biggest') -> list:
"""
Discards duplicate reports according to the strategy specified.
Args:
duplicate_reports: A dictionary with the duplicate reports, where the key is the SHA512 hash
of the binary file, and the value is a list of dictionaries, where each dictionary has
the path to the report and the size of the file. WARNING: it will be modified.
duplicates_strat: The strategy to follow when discarding duplicates, either 'first' (the
first to encounter prevails) or 'biggest' (the biggest file on disk prevails). (Default: 'biggest')
Returns:
A list with the paths to the reports that were moved (and should be discarded from the dataset).
"""
files_to_discard = []
if duplicates_strat == 'first':
for report in duplicate_reports:
for i in range(1, len(duplicate_reports[report])):
Path(next(iter(duplicate_reports[report][i]))).rename(
DUPLICATES_DIR + "duplicate_reports/" + Path(next(iter(duplicate_reports[report][i]))).name)
files_to_discard.append(next(iter(duplicate_reports[report][i])))
elif duplicates_strat == 'biggest':
for report in duplicate_reports:
size = next(iter(duplicate_reports[report][0].values()))
index = 0
for i in range(1, len(duplicate_reports[report])):
# if the i-th report is bigger than the current biggest, move the current biggest to
# the duplicates folder and update the biggest size and index
if next(iter(duplicate_reports[report][i].values())) > size:
size = next(iter(duplicate_reports[report][i].values()))
Path(next(iter(duplicate_reports[report][index]))).rename(
DUPLICATES_DIR + "duplicate_reports/" + Path(next(iter(duplicate_reports[report][index]))).name)
files_to_discard.append(next(iter(duplicate_reports[report][index])))
index = i
else: # if the i-th report is smaller than the current biggest, move it to the duplicates folder
Path(next(iter(duplicate_reports[report][i]))).rename(
DUPLICATES_DIR + "duplicate_reports/" + Path(next(iter(duplicate_reports[report][i]))).name)
files_to_discard.append(next(iter(duplicate_reports[report][i])))
return files_to_discard
def main(json_files: list, duplicates_strat: str, silent: bool, progress: Progress) -> list:
"""
Performs the analysis for reports of duplicate binary files, taking the version specified by
the strategy argument.
Args:
json_files: List of the reports to process.
duplicates_strat: The strategy to follow when discarding duplicates, either 'first' (the
first to encounter prevails) or 'biggest' (the biggest file on disk prevails) (Default: 'biggest').
silent: If True, no output will be printed.
progress: rich.progress.Progress object to show the processing status.
Returns:
A list containing the names (paths) of found duplicate reports.
"""
total_reports_in_folder = len(json_files)
task = None
if not silent:
progress.console.rule(
"[bold orange3]Phase 2: Detect duplicate reports[/bold orange3]", style="orange3")
progress.console.log(f"[+] Total reports: [orange3]{total_reports_in_folder}")
progress.console.log(f"[+] Strategy for duplicates: [orange3]{duplicates_strat}")
task = progress.add_task("[orange3]Detect duplicate reports", total=total_reports_in_folder)
# Creating duplicates directory, if it does not exist
Path(DUPLICATES_DIR).mkdir(parents=True, exist_ok=True)
# Search for duplicate reports
duplicate_reports = search_duplicates(json_files, progress, task, silent)
### Writing duplicate reports ###
with open(DUPLICATES_DIR + "duplicate_reports.json", "w") as file:
if not silent:
progress.console.log(
f"[+] Writing reports with duplicates to [orange3]{DUPLICATES_DIR}duplicate_reports.json")
json.dump(duplicate_reports, file, indent=4)
### Moving duplicate reports to DUPLICATES_DIR/duplicate_reports/ ###
if not silent:
progress.console.log(
f"[+] Moving duplicate reports to [orange3]{DUPLICATES_DIR}duplicate_reports/")
Path(DUPLICATES_DIR + "duplicate_reports/").mkdir(parents=True, exist_ok=True)
# Take only one report for each duplicate according to the strategy provided
reports_to_discard = move_duplicate_reports(duplicate_reports, duplicates_strat)
if not silent:
progress.stop_task(task)
progress.console.rule("[bold orange3]End of Phase 2[/bold orange3]", style="orange3")
return reports_to_discard
if __name__ == "__main__":
"""
If this script is executed directly (and not as part of the pipeline), it will parse the
arguments and execute the `main()` function.
"""
args = parse_arguments()
progress = Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
"•",
TimeElapsedColumn(),
"•",
TimeRemainingColumn(),
)
if not args.silent:
progress.start()
progress.console.rule("[bold green]MALVADA", style="green")
if args.duplicates not in ["first", "biggest"]:
progress.console.log(f"[!] Error:{args.duplicates} is not a valid strategy for"
"duplicates, should be 'first' or 'biggest'.")
exit()
reports = glob.glob(args.json_dir + "/*.json")
# Check if there are no reports within the directory
if not len(reports):
progress.console.log("Error: No reports found.")
progress.stop()
exit()
main(reports, args.duplicates, args.silent, progress)
if not args.silent:
progress.console.rule("[bold green]MALVADA", style="green")
progress.stop()