-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdbAccess.py
337 lines (290 loc) · 12.9 KB
/
dbAccess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
#########################
# R8DIUM : Run8 Database for Integrated User Management
#
# Copyright (C) 2023, S. Joshua Stein, <[email protected]>
#
# This file is part of the R8DIUM software tool suite.
#
# R8DIUM is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
#
# R8DIUM is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with R8DIUM.
# If not, see <https://www.gnu.org/licenses/>.
##########################
import csv
import hashlib
import requests
import uuid
import xmltodict
from r8diumInclude import SECURITY_FILE, DB_FILENAME, SEND_STATS, SOFTWARE_VERSION, STAT_URL, STAT_TOKEN
# Don't bother trying to send stats until we get a decent endpoint
if STAT_URL == '':
SEND_STATS = False
# Below define the tags which Run8 uses inside the security XML
XML_ROOT_NAME = 'HostSecurityData'
XML_BANNED_CATEGORY_NAME = 'Banned_Users'
XML_BANNED_NAME = 'BannedUser'
XML_UNIQUE_CATEGORY_NAME = 'Unique_Logins'
XML_UNIQUE_NAME = 'UniqueLogin'
XML_NAME = 'Name'
XML_UID = 'UID'
XML_IP = 'IP'
XML_PASSWORD = 'Password'
# Field names
sid = 'sid' # int (unique)
discord_name = 'discord_name' # str
discord_id = 'discord_id' # int
run8_name = 'run8_name' # str
uid = 'uid' # str
role = 'role' # str
password = 'password' # str
join_date = 'join_date' # str
last_login = 'last_login' # str
active = 'active' # bool
ip = 'ip' # str
banned = 'banned' # bool
ban_date = 'ban_date' # str
ban_duration = 'ban_duration' # str
notes = 'notes' # str[]
db_field_list = [sid,
discord_name,
discord_id,
run8_name,
uid,
role,
password,
join_date,
last_login,
active,
ip,
banned,
ban_date,
ban_duration,
notes]
def load_db(filename: str) -> list:
ldb = list()
try:
with open(filename, newline='') as csvfile:
input_file = csv.DictReader(csvfile)
for row in input_file:
ldb.append(row)
return ldb
except FileNotFoundError as e:
print(f'\nr8dium: Databse file {filename} not found, creating a new one')
with open(filename, 'w', newline='') as csvfile:
csvwriter = csv.DictWriter(csvfile, fieldnames=db_field_list)
csvwriter.writeheader()
return load_db(filename)
except Exception as e:
print(f'\nr8dium ({__name__}.py: FATAL exception in load_db, type unknown - contact devs')
exit(-1)
def save_db(filename: str, ldb: list) -> int:
try:
with open(filename, 'w', newline='') as csvfile:
csvwriter = csv.DictWriter(csvfile, fieldnames=db_field_list)
csvwriter.writeheader()
for row in ldb:
csvwriter.writerow(row)
return len(ldb)
except Exception as e:
print(f'\nr8dium ({__name__}.py: FATAL exception in save_db, type unknown - contact devs')
exit(-1)
def send_statistics(ldb: list):
if SEND_STATS:
# Create unique hashed server id
server_mac_addr = (
''.join(['{:02x}'.format((uuid.getnode() >> elements) & 0xff) for elements in range(5, -1, -1)]))
server_id = hashlib.md5((server_mac_addr + SECURITY_FILE[0]).encode()).hexdigest()
header_dict = {'Authorization': STAT_TOKEN}
put_dict = {'a': server_id, 'b': SOFTWARE_VERSION, 'c': len(ldb)}
return_val = requests.post(STAT_URL, data=put_dict, headers=header_dict)
return
def write_security_file(ldb: list, purge_uids=False):
# First we merge in the Run8 XML security file to capture any changes before overwriting
merge_security_file(ldb)
xml_dict = \
{XML_ROOT_NAME: {XML_BANNED_CATEGORY_NAME: {XML_BANNED_NAME: []},
XML_UNIQUE_CATEGORY_NAME: {XML_UNIQUE_NAME: []}}}
ban_list = xml_dict[XML_ROOT_NAME][XML_BANNED_CATEGORY_NAME][XML_BANNED_NAME]
usr_list = xml_dict[XML_ROOT_NAME][XML_UNIQUE_CATEGORY_NAME][XML_UNIQUE_NAME]
for record in ldb:
if purge_uids:
l_uid = None
else:
l_uid = record[uid]
if record[banned] == 'True':
ban_list.append({XML_NAME: record[run8_name],
XML_UID: l_uid,
XML_IP: record[ip]})
elif record[banned] == 'False' and record[password] != '': # Don't save users without pw
usr_list.append({XML_NAME: record[run8_name],
XML_UID: l_uid,
XML_PASSWORD: record[password]})
xml_out = xmltodict.unparse(xml_dict, pretty=True)
try:
for fname in SECURITY_FILE:
wp = open(fname, 'w')
wp.write(xml_out)
wp.close()
return 'file written'
except Exception as e:
print(f'\nr8dium ({__name__}.py: FATAL exception in write_security_file, type unknown - contact devs')
exit(-1)
def merge_security_file(ldb: list):
for fname in SECURITY_FILE:
try:
# Read current HostSecurity file and update UID fields based on password (gross)
fp = open(fname, 'r')
xml_in = xmltodict.parse(fp.read(), process_namespaces=True)
fp.close()
except FileNotFoundError as e:
print(f'\nr8dium ({__name__}.py): FATAL exception -> {e}')
exit(-1)
except Exception as e:
print(f'\nr8dium ({__name__}.py: FATAL exception in merge_security_file, type unknown - contact devs')
exit(-1)
update_flag = False
retstr = '`Merge results:\n-------------\n'
if type(xml_in[XML_ROOT_NAME]) is not dict:
# No entries in XML, just return
return f'File merge error : No category names found'
if type(xml_in[XML_ROOT_NAME][XML_UNIQUE_CATEGORY_NAME]) is not dict:
# No user entries in XML, just return
return f'File merge error : No names found'
if type(xml_in[XML_ROOT_NAME][XML_UNIQUE_CATEGORY_NAME][XML_UNIQUE_NAME]) is dict:
# Edge case to take of a single entry for XML-UNIQUE_NAME, just return and allow complete rewrite
return f'File merge error : only one {XML_UNIQUE_NAME} entry found'
for record in range(0, len(xml_in[XML_ROOT_NAME][XML_UNIQUE_CATEGORY_NAME][XML_UNIQUE_NAME])):
retstr += f'{record : 03d}: '
new_sid = get_element(xml_in[XML_ROOT_NAME][XML_UNIQUE_CATEGORY_NAME][XML_UNIQUE_NAME][record][XML_PASSWORD],
password, sid, ldb)
if new_sid != -1: # Found xml password in database
current_uid = get_element(new_sid, sid, uid, ldb)
# When run8 devs finally decide to capture the name, uncomment below
# new_r8name = ''
new_uid = ''
if current_uid == '' or current_uid.lower() == 'none': # No UID in database, so populate with XML version
try:
new_uid = xml_in[XML_ROOT_NAME][XML_UNIQUE_CATEGORY_NAME][XML_UNIQUE_NAME][record][XML_UID]
# When run8 devs finally decide to capture the name, uncomment below
# new_r8name = xml_in[XML_ROOT_NAME][XML_UNIQUE_CATEGORY_NAME][XML_UNIQUE_NAME][record][XML_NAME]
update_flag = True
# When run8 devs finally decide to capture the name, uncomment below and remove following line
# retstr += f'added UID[{new_uid}] and R8_name [{new_r8name}] ' \
# f'to SID[{new_sid}] ({get_element(new_sid, sid, discord_name, ldb)})\n'
retstr += f'added UID[{new_uid}] ' \
f'to SID[{new_sid}] ({get_element(new_sid, sid, discord_name, ldb)})\n'
except KeyError:
retstr += f'Found password but no UID (in XML) for ' \
f'SID[{new_sid}]\n'
else: # UID found in XML
try:
new_uid = xml_in[XML_ROOT_NAME][XML_UNIQUE_CATEGORY_NAME][XML_UNIQUE_NAME][record][XML_UID]
# When run8 devs finally decide to capture the name, uncomment below
# new_r8name = xml_in[XML_ROOT_NAME][XML_UNIQUE_CATEGORY_NAME][XML_UNIQUE_NAME][record][XML_NAME]
if new_uid != current_uid:
# db and xml do not match, what to do? Just notify user for now
# Maybe we should start keeping a list of these UIDs? (ala Notes)
retstr += f'Existing UID mismatch for SID[{new_sid}]:\n'
retstr += f' database file UID: {current_uid}\n'
retstr += f' host_security UID: {new_uid}\n'
retstr += (f'Database file NOT updated for this record\n'
f'-----------------------------------------')
else:
retstr += f'Existing UID valid for SID[{new_sid}]\n'
except KeyError:
retstr += f'Found password but no UID (in XML) for SID {new_sid}'
if update_flag:
# When run8 devs finally decide to capture the name, uncomment below
# set_element(new_sid, sid, run8_name, new_r8name, ldb) # Updating this for no real reason atm
set_element(new_sid, sid, uid, new_uid, ldb)
save_db(DB_FILENAME, ldb)
else:
retstr += f'Password '
retstr += f'[{xml_in[XML_ROOT_NAME][XML_UNIQUE_CATEGORY_NAME][XML_UNIQUE_NAME][record][XML_PASSWORD]}]'
retstr += ' not found\n'
return retstr
def get_index(key, search_col: str, ldb: list):
"""
Return index of record keyed off of <search_string> contained in column <search_col>
returns (-1) if string not found or invalid column name is specified.
NOTE: Will return first instance if there are duplicates
"""
index = 0
try:
for line in ldb:
if line[str(search_col)] == str(key):
return index
index += 1
return -1
except Exception as e:
print(f'{get_element.__name__} : {e.__class__.__name__} ({e})')
return -1
def get_element(key, search_col: str, return_col: str, ldb: list):
"""
Return value in return_col column keyed off of <search_string> contained in column <search_col>
returns (-1) if string not found or invalid column name is specified.
NOTE: Will return first instance if there are duplicates
"""
try:
for line in ldb:
if line[str(search_col)] == str(key):
return line[str(return_col)]
return -1
except Exception as e:
print(f'{get_element.__name__} : {e.__class__.__name__} ({e})')
return -1
def set_element(key, search_col: str, set_col: str, set_val, ldb: list):
"""
Set the value <set_val> to column <set_col> of the row found with <search_col> == <key>
returns sid of record which has been modified
returns (-1) if no record found
NOTE: Will hit on first match if multiples exist
"""
if set_val is None:
set_val = ''
try:
for line in ldb:
if line[str(search_col)] == str(key):
line[str(set_col)] = str(set_val)
return line[sid]
return -1
except Exception as e:
print(f'{set_element.__name__} : {e.__class__.__name__} ({e})')
return -1
def add_new_user(name: str, user_name, ldb: list):
new_sid = next_avail_sid(ldb)
record = {}
for field in db_field_list:
record[field] = ''
record[sid] = str(new_sid)
record[discord_id] = name
if record[discord_name] == '':
record[discord_name] = user_name
if new_sid <= len(ldb):
ldb.insert(new_sid - 1, record)
else:
ldb.append(record)
return new_sid
def del_user(search_sid, ldb: list):
# Find index in ldb corresponding to sid and remove
index = get_index(str(search_sid), sid, ldb)
if index < 0:
return -1
del ldb[index]
return index
def next_avail_sid(ldb: list):
index = 0
for row in ldb:
if int(row[sid]) - index > 1:
# Found empty row
return index + 1
else:
if int(row[sid]) > index:
index = int(row[sid])
return index + 1
if __name__ == '__main__':
pass