-
Notifications
You must be signed in to change notification settings - Fork 0
/
sql_query.py
301 lines (212 loc) · 9.44 KB
/
sql_query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
import sqlite3
from Globals import Global
from Globals import GlobalConfig as Config
import os, os.path
import re # regex
class sql_query():
def __init__(self, db_name):
# make sure that the db root is set
if not Config.is_set( "db_root" ):
Config.set("db_root", "")
self.db_name = db_name
self.connection = None
self.cursor = None
def connect_db(self):
""" Connect to the SQLite DB, creates new if not exist """
self.connection = sqlite3.connect(Config.get("db_root")+self.db_name)
self.cursor = self.connection.cursor()
def destroy_database(self):
if os.path.exists( Config.get("db_root")+self.db_name ):
os.remove( Config.get("db_root")+self.db_name )
def close_db(self, commit = True):
"""Closes the db connection"""
if commit:
self.connection.commit()
self.connection.close()
def get_all_tables(self):
""" Gets an list of tuples with all table names in database
:return: list of table names [table_name, ...]
"""
query = "SELECT name FROM sqlite_master WHERE type='table'"
self.connect_db()
self.cursor.execute(query)
data = self.cursor.fetchall()
# clean the data removing the inner list of 1 element
for i in range (len(data)):
data[i] = data[i][0]
self.close_db()
return data
def get_table_columns(self, table_name):
""" Gets a list of tuples with all column data for table
:param table_name: table to get column data from
:return: [(col_id, col_name, type, can_be_null, default_value, part_of_primary_key)...]
"""
table_name = re.sub("\s", "_", table_name ) # replace white space with underscores
query = "pragma table_info("+table_name+")"
self.connect_db()
self.cursor.execute(query)
data = self.cursor.fetchall()
row_count = len(data)
self.close_db()
return data
def get_table_column_names(self, table_name):
""" Gets a list of all column names (in order)"""
table_name = re.sub("\s", "_", table_name) # replace white space with underscores
return [u[1] for u in self.get_table_columns(table_name)]
def table_exist(self, table_name, close_connect = True):
"""Check if table exist in database"""
query = "SELECT name FROM sqlite_master WHERE type='table' AND name=? "
self.connect_db()
table_name = re.sub("\s", "_", table_name) # replace white space with underscores
self.cursor.execute(query, (table_name,))
row_count = len( self.cursor.fetchall() )
if close_connect:
self.close_db()
return row_count
''' Check if table exist else creats it.
@:param table_name: Name of the table to be created.
@:param col_names: col string
'''
def add_table(self, table_name, col_names, data_types, data_lengths=None, default_values=None):
"""Adds new table to database
:param col_names: List of column names
:param data_types: list of data types (must match col names or none)
:param data_lengths: list of max column length (must match col names or none)
:param default_values: list of default values for column (must match col names or none)
"""
table_name = re.sub("\s", "_", table_name) # replace white space with underscores
if self.table_exist( table_name ):
print("Error: can not create table(", table_name, "), already exist")
return 404, "table already exist"
query = "CREATE TABLE "+table_name
columns = []
for i, v in enumerate(col_names):
data_l = ""
default_v = ""
if data_lengths is not None and data_lengths[i] != "":
data_l = "("+data_lengths[i]+")"
if default_values is not None and default_values[i] != "":
default_v = ' DEFAULT "'+default_values[i]+'"'
v = re.sub("\s", "_", v ) # replace white space with underscores
# make sure the first character is not a number
failed = False
try:
int(v[0])
except: # add underscore at start if it does
failed = True
if not failed:
v = "_"+v
columns.append( v +" "+ data_types[i] + data_l + default_v )
print(columns)
columns = ', '.join(columns)
self.connect_db()
query += "("+columns+")"
print(query)
self.cursor.execute(query)
self.close_db()
print("Table Created")
return None
def drop_table(self, table_name):
"""drops table from database"""
if not self.table_exist( table_name ):
print("Error: can not drop table, that does not already exist")
return
self.connect_db()
query = "DROP TABLE " + table_name
self.cursor.execute(query)
self.close_db()
def insert_row(self, table_name, value_columns, value_data):
"""Inserts rot into table"""
if not self.table_exist(table_name):
print("Error: can not insert row into table, table does not exist")
return
self.connect_db()
col_name_str = ', '.join(value_columns)
col_value_str = ', '.join(["?"] * len(value_data))
query = "INSERT INTO " + table_name + " (" + col_name_str + ") VALUES (" + col_value_str + ") "
print(query)
if Global.DEBUG:
print("query: ", query, "Data", value_data)
self.cursor.execute(query, value_data)
self.close_db()
print("data Inserted to table")
def remove_row(self, table_name, where_columns, where_data):
"""remove row from table"""
if not self.table_exist(table_name):
print("Error: can not delete row from table, table does not exist")
return
where_str = self.sql_string_builder( where_columns, "AND " )
self.connect_db()
query = " DELETE FROM "+table_name+" WHERE "+where_str
if Global.DEBUG:
print(query, where_data)
self.cursor.execute( query, where_data )
self.close_db()
def select_from_table(self, table_name, column_names, where_columns=[], where_data=[], order_data={}):
""" Selects rows of data from table
:param table_name: Name of table to select data from
:param column_names: list of column names to get data rom (* = all)
:param where_columns: list of where column names
:param where_data: list of where data (must match where column order)
:param order_data: dict of order data keys {"order_columns": list of strings, "sort_type": string (ASC || DESC)}
:return:
"""
if not self.table_exist(table_name):
print("Error: can not select from table, table does not exist")
return
# turn the lists of column names into a usable sql string
col_str = self.sql_string_builder( column_names, ",", False )
where_str = self.sql_string_builder(where_columns, "AND ")
# create the where string
if len(where_columns) > 0:
where_str = " WHERE "+where_str
else:
where_str = ""
# create the order by string
if order_data is not None and "order_columns" in order_data and \
"sort_type" in order_data and type(order_data["order_columns"] is list):
order_str = ', '.join(order_data["order_columns"])
order_str = "ORDER BY " + order_str + " " + order_data["sort_type"]
else:
order_str = ""
query = "SELECT " + col_str + " FROM " + table_name + where_str + order_str
print (query)
self.connect_db()
self.cursor.execute( query, where_data )
data = self.cursor.fetchall()
self.close_db()
return data
def update_row(self, table_name, set_columns, set_data, where_columns, where_data ):
""" Updates table row
:param table_name: Name of table to update
:param set_columns: list or tuple of columns to set
:param set_data: list or tuple of data to set into columns (order must match set_str)
:param where_columns: list or tuple of wheres
:param where_data: list or tuple of where data (order must match where_str)
:return:
"""
if not self.table_exist(table_name):
print("Error: can not update row in table, table does not exist")
return
set_str = self.sql_string_builder(set_columns, ",")
where_str = self.sql_string_builder(where_columns, "AND ")
data = (*set_data, *where_data)
self.connect_db()
query = "UPDATE "+table_name+" SET "+set_str+" WHERE "+where_str
print( query )
self.cursor.execute(query, data)
self.close_db()
def sql_string_builder(self, column_names, join, add_equals=True):
""" build a list of column names in to sql query string for set and where ect... """
string = ""
if add_equals is True:
equals = "=? "
else:
equals = " "
# make column string
for s in column_names:
string += s + equals + join
# clear end ','
if string[-len(join):] == join:
string = string[:-len(join)]
return string