-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathbase.py
384 lines (341 loc) · 14.6 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
import os
import shutil
import yaml
import datetime
import dill
from natsort import natsorted
class Run:
"""
An instance of a script execution.
"""
def __init__(self, foodcoop, configuration, log=None, name="", next_possible_methods=None):
self.path, self.name = prepare_output(foodcoop=foodcoop, configuration=configuration, name=name)
self.foodcoop = foodcoop
self.configuration = configuration
if log:
self.log = log
else:
self.log = []
if next_possible_methods:
self.next_possible_methods = next_possible_methods
else:
self.next_possible_methods = []
self.completion_percentage = 0
def save(self):
file_path = os.path.join(self.path, "run.obj")
with open(file_path, 'wb') as file:
dill.dump(self, file)
@classmethod
def load(cls, path):
with open(os.path.join(path, "run.obj"), 'rb') as file:
return dill.load(file)
class ScriptMethod:
"""
A callable method/function in a script.
Create ScriptMethod instances for all methods which should be callable by the user at some point,
and put them into next_possible_methods at the end of a method after which they should be callable.
"""
def __init__(self, name, inputs=None):
self.name = name
if inputs:
self.inputs = inputs
else:
self.inputs = []
class LogEntry:
"""
An entry of a run's log: What has been done by whom at what date and time.
"""
def __init__(self, action, done_by=""):
self.action = action
self.done_by = done_by
self.datetime = datetime.datetime.now()
class Variable:
"""
A variable for a script, either drawn from the config file or the environment variables.
"""
def __init__(self, name, required=False, example=None, description=""):
self.name = name
self.required = required
self.example = example
self.description = description
class Input:
"""
A variable for a script which has to be entered anew each time.
"""
def __init__(self, name, required=False, input_format="", accepted_file_types=None, example=None, description=""):
self.name = name
self.required = required
self.input_format = input_format # "" (auto), "textarea", "file", "files"; and html input types like "text", "number", ...
if accepted_file_types:
self.accepted_file_types = accepted_file_types # for example [".csv"], or [] for all
else:
self.accepted_file_types = []
self.example = example # if not a file
self.description = description
class Category:
"""
Nestable categories e.g. for articles, for usage within a script.
"""
def __init__(self, number=None, name="", subcategories=None):
self.number = number
self.name = name
if subcategories:
self.subcategories = subcategories
else:
self.subcategories = []
def prepare_string_for_comparison(string, case_sensitive, strip):
if not case_sensitive:
string = string.casefold()
if strip:
string = string.strip()
return string
def equal_strings_check(list1, list2, case_sensitive=False, strip=True):
# compares strings of two lists for matches and returns list of matching strings
matches = []
compare_list1 = [prepare_string_for_comparison(string, case_sensitive, strip) for string in list1]
compare_list2 = [prepare_string_for_comparison(string, case_sensitive, strip) for string in list2]
for string in compare_list1:
if string in compare_list2:
matches.append(list1[compare_list1.index(string)])
return matches
def containing_strings_check(list1, list2, case_sensitive=False, strip=True):
# checks if any string of list1 contains any string of list2 and returns list of matching strings of list2
matches = []
compare_list1 = [prepare_string_for_comparison(string, case_sensitive, strip) for string in list1]
compare_list2 = [prepare_string_for_comparison(string, case_sensitive, strip) for string in list2]
for string in compare_list1:
for query_string in compare_list2:
if query_string in string:
matches.append(list2[compare_list2.index(query_string)])
return matches
def startswith_strings_check(list1, list2, case_sensitive=False, strip=True):
# checks if any string of list1 starts with any string of list2 and returns list of matching strings of list2
matches = []
compare_list1 = [prepare_string_for_comparison(string, case_sensitive, strip) for string in list1]
compare_list2 = [prepare_string_for_comparison(string, case_sensitive, strip) for string in list2]
for string in compare_list1:
for query_string in compare_list2:
if string.startswith(query_string):
matches.append(list2[compare_list2.index(query_string)])
return matches
def endswith_strings_check(list1, list2, case_sensitive=False, strip=True):
# checks if any string of list1 ends with any string of list2 and returns list of matching strings of list2
matches = []
compare_list1 = [prepare_string_for_comparison(string, case_sensitive, strip) for string in list1]
compare_list2 = [prepare_string_for_comparison(string, case_sensitive, strip) for string in list2]
for string in compare_list1:
for query_string in compare_list2:
if string.endswith(query_string):
matches.append(list2[compare_list2.index(query_string)])
return matches
def replace_in_string(string: str, strings_to_replace: dict) -> str:
# strings_to_replace must be a dictionary of {"string to replace": "by another string"}
for string_tuple in strings_to_replace.items():
string = string.replace(string_tuple[0], string_tuple[1])
return string
def remove_double_strings_loop(text, string, description=None, number_of_runs=100):
loop_count = 0
while string+string in text:
text = text.replace(string+string, string)
loop_count += 1
if loop_count > number_of_runs:
if not description:
description = "'" + string + "'"
print("\nLoop to replace double " + description + " ran " + number_of_runs + " times for following text:")
print(text)
break
return text
def find_available_locales():
available_locales = []
for package in os.listdir("locales"):
package_path = os.path.join("locales", package)
if not os.path.isdir(package_path):
continue
for file in [f for f in os.listdir(package_path) if os.path.isfile(os.path.join(package_path, f))]:
if file.endswith(".yaml"):
locale = file.replace(".yaml", "")
if locale not in available_locales:
available_locales.append(locale)
return available_locales
def find_instances():
root_path = "data"
if os.path.isdir(root_path):
return [d for d in os.listdir(root_path) if os.path.isfile(os.path.join(root_path, d, "settings.yaml"))]
else:
return []
def find_configurations(foodcoop):
root_path = os.path.join("data", foodcoop)
return [d for d in os.listdir(root_path) if os.path.isdir(os.path.join(root_path, d))]
def read_config(foodcoop, configuration):
filename = os.path.join("data", foodcoop, configuration, "config.yaml")
if os.path.isfile(filename):
with open(filename, encoding="UTF8") as yaml_file:
configuration = yaml.safe_load(yaml_file)
else:
configuration = {}
return configuration
def read_in_config(config, detail, alternative=None):
# legacy code, use config.get(detail, alternative) instead
if detail in config:
return config[detail]
else:
return alternative
def save_config(foodcoop, configuration, config):
config_path = os.path.join("data", foodcoop, configuration)
os.makedirs(config_path, exist_ok=True)
filename = os.path.join(config_path, "config.yaml")
with open(filename, "w", encoding="UTF8") as yaml_file:
yaml.safe_dump(config, yaml_file, allow_unicode=True, indent=4, sort_keys=False)
def set_config_detail(foodcoop, configuration, detail, value):
config = read_config(foodcoop, configuration)
config[detail] = value
save_config(foodcoop, configuration, config)
def rename_configuration(foodcoop, old_configuration_name, new_configuration_name):
configurations = find_configurations(foodcoop)
if old_configuration_name in configurations:
# rename folder
existing_output_path = output_path(foodcoop, old_configuration_name)
new_output_path = output_path(foodcoop, new_configuration_name)
if os.path.exists(new_output_path): # TODO: ask if user really wants to merge configurations and which config.yaml should be kept; then keep that config.yaml resp. overwrite it, instead of creating config.yaml*
for entry in os.scandir(existing_output_path):
new_entry_name = entry.name
while os.path.exists(os.path.join(new_output_path, new_entry_name)):
new_entry_name += "*"
os.rename(os.path.join(existing_output_path, entry.name), os.path.join(new_output_path, new_entry_name))
os.rmdir(existing_output_path)
else:
os.rename(existing_output_path, new_output_path)
# update path attribute in run objects
run_folders = [d for d in os.listdir(new_output_path) if os.path.isdir(os.path.join(new_output_path, d))]
for run_folder in run_folders:
run = run.load(path=os.path.join(new_output_path, run_folder))
run.path = new_output_path
run.save()
return new_configuration_name
else:
return None
def delete_configuration(foodcoop, configuration):
configuration_path = os.path.join("data", foodcoop, configuration)
success = None
feedback = None
if os.path.exists(configuration_path):
try:
shutil.rmtree(configuration_path)
success = True
except OSError as e:
success = False
feedback = f"Error: {configuration_path} : {e.strerror}"
return success, feedback
def read_settings(foodcoop):
settings_path = os.path.join("data", foodcoop)
os.makedirs(settings_path, exist_ok=True)
filename = os.path.join(settings_path, "settings.yaml")
if os.path.isfile(filename):
with open(filename, encoding="UTF8") as yaml_file:
settings = yaml.safe_load(yaml_file)
else:
settings = {
"default_locale": "de_AT",
"configuration_groups": {
}
}
return settings
def save_settings(foodcoop, settings):
settings_path = os.path.join("data", foodcoop)
os.makedirs(settings_path, exist_ok=True)
filename = os.path.join(settings_path, "settings.yaml")
with open(filename, "w", encoding="UTF8") as yaml_file:
yaml.safe_dump(settings, yaml_file, allow_unicode=True, indent=4, sort_keys=False)
def set_setting(foodcoop, setting, value):
settings = read_settings(foodcoop)
settings[detail] = value
save_settings(foodcoop, settings)
def read_locales(foodcoop, locale=None):
if not locale:
locale = read_settings(foodcoop)["default_locale"]
locales = {}
for package in os.listdir("locales"):
if not os.path.isdir(os.path.join("locales", package)):
continue
if os.path.isfile(os.path.join("locales", package, locale)):
locale_package = locale + ".yaml"
elif os.path.isfile(os.path.join("locales", package, "en")):
locale_package = "en.yaml"
else:
files = os.listdir(os.path.join("locales", package))
if files:
locale_package = files[0]
else:
continue
with open(os.path.join("locales", package, locale_package), encoding="UTF8") as yaml_file:
locales[package] = yaml.safe_load(yaml_file)
return locales
def output_path(foodcoop, configuration):
return os.path.join("data", foodcoop, configuration)
def get_outputs(foodcoop, configuration):
path = output_path(foodcoop, configuration)
if os.path.exists(path):
return [d for d in natsorted(os.listdir(path)) if os.path.isdir(os.path.join(path, d))]
else:
return []
def get_file_path(foodcoop, configuration, run, folder, ending="", notifications=None):
if not notifications:
notifications = []
path = os.path.join(output_path(foodcoop, configuration), run, folder)
if os.path.isdir(path) and ending:
files = [os.path.join(path, f) for f in os.listdir(path) if f.endswith(ending)]
if len(files) > 1:
notifications.append("Warning: Multiple files found for " + run)
if files:
path = files[0]
return path, notifications
def list_categories(locales, categories):
txt = ""
for category in categories:
txt += "#"
if category.number:
txt += str(category.number)
if category.name:
txt += " " + category.name
if category.subcategories:
subcats = category.subcategories.copy()
txt += f' ({locales["base"]["incl. subcategories"]} ' + subcats[0].name
subcats.pop(0)
for sc in subcats:
txt += ", " + sc.name
txt += ")"
txt += "\n"
return txt
def prepare_output(foodcoop, configuration, name=""):
path = "data"
os.makedirs(path, exist_ok=True)
path = os.path.join(path, foodcoop)
os.makedirs(path, exist_ok=True)
path = os.path.join(path, configuration)
os.makedirs(path, exist_ok=True)
if name:
path = os.path.join(path, name)
os.makedirs(path, exist_ok=True)
else:
name = datetime.date.today().isoformat()
path = os.path.join(path, name)
number = 1
while os.path.isdir(path + "_" + str(number)):
number += 1
path += "_" + str(number)
name += "_" + str(number)
os.makedirs(path, exist_ok=True)
return path, name
def file_path(path, folder, file_name):
path = os.path.join(path, folder)
os.makedirs(path, exist_ok=True)
return os.path.join(path, file_name)
def write_txt(file_path, content):
with open(file_path + ".txt", "w", encoding="UTF8") as f:
f.write(content)
def full_user_name(session):
# Return full name of session user
if session.foodsoft_connector:
return " ".join([session.foodsoft_connector.first_name, session.foodsoft_connector.last_name]).strip()
else: # TODO: check if session has automated task -> return task name (?)
return "Unbekannt"