-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspi_processor.py
273 lines (229 loc) · 11.1 KB
/
spi_processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
#!/usr/bin/env python3
"""
@file spi_processor.py
@brief A Python script that processes files of SPI communication data by running the input file through spi_processor.py with different flags.
The script can copy the input file, process it with spi_processor.py with either the -i or -f flag, and read from the SPI communication buffer.
@author Onur Ulusoy
@date 13.04.2023
@license MIT
"""
import argparse
import hashlib
import json
import os
import time
import yaml
class SpiFileProcessor:
"""
@class SpiFileProcessor
@brief A class that provides methods to process files of SPI communication data by running the input file through spi_processor.py with different flags.
"""
def __init__(self):
"""
@brief Initializes a new instance of the SpiFileProcessor class and loads configuration parameters from config.yaml.
"""
self.config = self.load_config()
self.spi_data_file = self.config["spi_data_file"]
self.input_file_find_mode = self.config["input_file_find_mode"]
self.output_file_find_mode = self.config["output_file_find_mode"]
self.input_file_input_mode = self.config["input_file_input_mode"]
self.sleep_time_find_mode = self.config["sleep_time_find_mode"]
self.spi_write_data = ""
def load_config(self):
"""
@brief Loads the configuration parameters from the `config.yaml` file.
@return A dictionary containing the configuration parameters.
"""
config_file = "config.yaml"
if os.path.exists(config_file):
with open(config_file, 'r') as yaml_file:
config = yaml.safe_load(yaml_file)
else:
assert("Config file cannot be found.")
return config
def commit_spi(self):
"""
@brief Reads the input file and saves the SPI data to a JSON file.
"""
with open(self.input_file_input_mode, 'r') as infile:
for line in infile:
line = line.strip() # Remove extra spaces and newlines
if line.startswith("spi_write"):
self.spi_write_data += line + '\n' # Add newline after each line
elif line.startswith("spi_read"):
self.save_to_json(self.spi_write_data.strip(), line) # Remove extra spaces and newlines
self.spi_write_data = ""
# Save the remaining spi_write_data
if self.spi_write_data:
self.save_to_json(self.spi_write_data.strip(), "")
def save_to_json(self, spi_write_data, spi_read_line):
"""
@brief Saves the SPI data to a JSON file.
@param spi_write_data The SPI write data.
@param spi_read_line The SPI read line.
"""
table_name = self.encrypt_write_data()
if os.path.exists(self.spi_data_file):
with open(self.spi_data_file, "r") as infile:
self.spi_data = json.load(infile)
else:
self.spi_data = {}
if table_name not in self.spi_data:
self.spi_data[table_name] = []
if not self.spi_data[table_name] or self.spi_data[table_name][-1]["spi_read_line"] != spi_read_line:
new_entry = {
"spi_write_line": spi_write_data,
"spi_read_line": spi_read_line,
"entry_count": 1,
}
self.spi_data[table_name].append(new_entry)
else:
self.spi_data[table_name][-1]["entry_count"] += 1
with open(self.spi_data_file, "w") as outfile:
json.dump(self.spi_data, outfile, indent=4)
def get_associated_spi_read(self, spi_write_data):
"""
@brief Gets the associated SPI read lines from the spi_tree dictionary based on the input spi_write_data string.
@param spi_write_data The string to search for in the spi_tree dictionary.
@return A list of associated SPI read lines.
"""
spi_read_lines = []
if spi_write_data in self.spi_tree:
all_zero = all(iteration_number == 0 for _, _, iteration_number in self.spi_tree[spi_write_data])
if all_zero:
spi_read_line, entry_count, _ = self.spi_tree[spi_write_data][0]
self.spi_tree[spi_write_data][0] = (spi_read_line, entry_count, 1)
for i, (spi_read_line, entry_count, iteration_number) in enumerate(self.spi_tree[spi_write_data]):
if iteration_number != 0:
spi_read_lines.append(spi_read_line)
# Increment iteration_number by 1
new_iteration_number = iteration_number + 1
# If iteration count == entry count, make it 0 and set the next cell's iteration to 1
if new_iteration_number > entry_count:
new_iteration_number = 0
if i + 1 < len(self.spi_tree[spi_write_data]):
next_spi_read_line, next_entry_count, _ = self.spi_tree[spi_write_data][i + 1]
self.spi_tree[spi_write_data][i + 1] = (next_spi_read_line, next_entry_count, 1)
self.spi_tree[spi_write_data][i] = (spi_read_line, entry_count, new_iteration_number)
break # exit the loop once we've found the entry with a non-zero iteration number
self.print_spi_tree()
return spi_read_lines
def create_tree_from_json(self):
"""
@brief Parses the SPI data from JSON file and creates a dictionary for the associated SPI reads.
"""
json_file = 'spi_data.json'
with open(json_file, 'r') as f:
self.spi_data = json.load(f)
self.spi_tree = {}
for table_name, rows in self.spi_data.items():
for i, row in enumerate(rows):
spi_write_line, spi_read_line, entry_count = row.values()
if spi_write_line not in self.spi_tree:
self.spi_tree[spi_write_line] = []
iteration_number = 1 if i == 0 else 0
self.spi_tree[spi_write_line].append((spi_read_line, entry_count, iteration_number))
def print_spi_tree(self):
"""
@brief Prints the associated SPI reads tree to console with color-coded output to provide easy debug.
"""
# ANSI escape codes for colors
GREEN = "\033[32m"
RESET = "\033[0m"
YELLOW = "\033[33m"
for spi_write_line, associated_spi_read_lines in self.spi_tree.items():
print(f"{spi_write_line}:")
for spi_read_line, entry_count, iteration_number in associated_spi_read_lines:
print(f" {spi_read_line} {GREEN}x{entry_count}, {YELLOW} iteration: {iteration_number}{RESET}")
print()
def get_associated_spi_read_from_file(self):
"""
@brief Reads SPI write data from the data tree and searches for associated SPI reads.
"""
with open(self.input_file_find_mode, 'r') as infile:
spi_write_lines = infile.read().strip()
with open(self.output_file_find_mode, 'w') as outfile:
if not spi_write_lines:
print("spi_write buffer is empty.")
else:
print(spi_write_lines, "*")
if spi_write_lines == "TERMINATE":
print("spi_processor script is terminating")
outfile.write("TERMINATED")
exit()
associated_spi_read_lines = self.get_associated_spi_read(spi_write_lines)
print(f"Searching for:\n{spi_write_lines}")
if associated_spi_read_lines:
print("Associated spi_read lines:")
for spi_read_line in associated_spi_read_lines:
print(spi_read_line)
outfile.write(f"{spi_read_line}\n")
else:
print("No such key found.")
print("------")
def display_spi_data(self):
"""
@brief Displays the SPI data stored in JSON file with color-coded output for easy debug.
"""
# ANSI escape codes for colors
GREEN = "\033[32m"
YELLOW = "\033[33m"
RESET = "\033[0m"
spi_write_data_color = YELLOW
spi_read_line_color = GREEN
self.create_tree_from_json()
for table_name, rows in self.spi_data.items():
for row in rows:
spi_write_line, spi_read_line, entry_count = row.values()
# Add color to the strings
colored_spi_write_data = f"{spi_write_data_color}{spi_write_line.strip()}{RESET}"
colored_spi_read_line = f"{spi_read_line_color}{spi_read_line.strip()}{RESET}"
print("\n")
print(f"{colored_spi_write_data}\nassociates\n{colored_spi_read_line}\n\n(entry count: {entry_count})")
print("\n")
def encrypt_write_data(self):
"""
@brief Creates a hash of the spi_write line to use as the representitive table name in json.
@return The table name created by hashing the spi_write line.
"""
# Create a hash of the spi_write line to use as the table name
table_name = "spi_" + hashlib.sha1(self.spi_write_data.encode()).hexdigest()
return table_name
def sleep(self):
"""
@brief Causes the current thread to sleep for a specified amount of time configured in config.yaml.
"""
time.sleep(self.sleep_time_find_mode)
def main(display_flag=False, find_flag=False, print_tree_flag=False):
"""
@brief The main function that executes the SpiFileProcessor class and its behaviors.
@param display_flag: If True, displays the SPI data.
@param find_flag: If True, finds the associated SPI read line.
@param print_tree_flag: If True, prints the SPI tree.
"""
spi_processor = SpiFileProcessor()
if not display_flag and not find_flag and not print_tree_flag:
spi_processor.commit_spi()
exit()
elif display_flag:
spi_processor.display_spi_data()
elif find_flag:
spi_processor.create_tree_from_json()
while True:
spi_processor.get_associated_spi_read_from_file()
spi_processor.sleep()
elif print_tree_flag:
spi_processor.create_tree_from_json()
spi_processor.print_spi_tree()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process input file.")
parser.add_argument("-i", "--input", help="The input text file.", action="store_true")
parser.add_argument("-d", "--display", help="Display the SPI data.", action="store_true")
parser.add_argument("-f", "--find", help="Find the associated SPI read line.", action="store_true")
parser.add_argument("-p", "--print-tree", help="Print the SPI tree.", action="store_true")
args = parser.parse_args()
if not args.display and not args.find and not args.print_tree and not args.input:
print("Error: An input file is required when not using the --display, --find, or --print-tree flags.")
parser.print_help()
else:
main(args.display, args.find, args.print_tree)