-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconfig.py
executable file
·416 lines (374 loc) · 16 KB
/
config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
#!/usr/bin/env python
"""
configuration for faps
Provides the Options class that will transparently handle the different option
sources through the .get() method. Pulls in defaults, site and job options plus
command line customisation. Instantiating Options will set up the logging for
the particular job.
"""
__all__ = ['Options']
# Python 3 fix
try:
import configparser
except ImportError:
import ConfigParser as configparser
import copy
import logging
import os
import re
import sys
import textwrap
# Python 3 fix
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
from optparse import OptionParser
from logging import debug, error, info
import __main__
class Options(object):
"""
Transparent options handling.
A single unified way of dealing with input files and command line options
delivering sensible defaults for unspecified values. Access options with
the .get() method, or the method that specifies the expected type. It is
recommended to replace with a new instance each time the script is run,
otherwise commandline options or changed input files will not be picked up.
"""
def __init__(self, job_name=None):
"""Initialize options from all .ini files and the commandline."""
# use .get{type}() to read attributes, only access args directly
self.job_dir = ''
self.script_dir = ''
self.job_name = job_name
self.args = []
self.options = {}
self.cmdopts = {}
self._used_options = set()
self.defaults = configparser.SafeConfigParser()
self.site_ini = configparser.SafeConfigParser()
self.job_ini = configparser.SafeConfigParser()
# populate options
self._init_paths()
self.commandline()
self._init_logging()
self.load_defaults()
self.load_site_defaults()
self.load_job_defaults()
if self.options.job_type:
self.job_type_ini = configparser.SafeConfigParser()
self.load_job_type(self.options.job_type)
else:
self.job_type_ini = NullConfigParser()
def get(self, item):
"""Map values from different sources based on priorities."""
# report default options differently
option_source = 'D'
if item in self.__dict__:
# Instance attributes, such as job_name and job_dir
debug("an attribute: %s" % item)
option_source = 'A'
value = object.__getattribute__(self, item)
elif self.options.__dict__.get(item) is not None:
# Commandline options from optparse where option is set
debug("an option: %s" % item)
option_source = 'C'
value = self.options.__dict__[item]
elif item in self.cmdopts:
# Commandline -o custom key=value options
debug("a custom -o option: %s" % item)
option_source = 'O'
value = self.cmdopts[item]
elif self.job_ini.has_option('job_config', item):
# jobname.fap per-job setings
debug("a job option: %s" % item)
option_source = 'F'
value = self.job_ini.get('job_config', item)
elif self.job_type_ini.has_option('job_type', item):
debug("a job_type option: %s" % item)
option_source = 'J'
value = self.job_type_ini.get('job_type', item)
elif self.site_ini.has_option('site_config', item):
debug("a site option: %s" % item)
value = self.site_ini.get('site_config', item)
elif self.defaults.has_option('defaults', item):
debug("a default: %s" % item)
value = self.defaults.get('defaults', item)
else:
# Most things have a default, but not always. Error properly.
debug("unspecified option: %s" % item)
raise AttributeError(item)
# Show what options are used the first time they are accessed
# for the traceability
if item not in self._used_options:
if option_source == 'D':
debug("Default: %s = %s" % (item, value))
else:
info("Option (%s): %s = %s" % (option_source, item, value))
self._used_options.add(item)
# we output the raw value here and pass to caller for
return value
def getbool(self, item):
"""
Parse option and if the value of item is not already a bool return
True for "1", "yes", "true" and "on" and False for "0", "no", "false"
and "off". Case-insensitive.
"""
value = self.get(item)
if isinstance(value, bool):
return value
# Can't use isinstance with basestring to be 2.x and 3.x compatible
# fudge it by assuming strings can be lowered
elif hasattr(value, 'lower'):
if value.lower() in ["1", "yes", "true", "on"]:
return True
elif value.lower() in ["0", "no", "false", "off"]:
return False
else:
# Not a valid bool
raise ValueError(value)
else:
return bool(item)
def getint(self, item):
"""Return item's value as an integer."""
value = self.get(item)
return int(value)
def getfloat(self, item):
"""Return item's value as a float."""
value = self.get(item)
return float(value)
def gettuple(self, item, dtype=None):
"""Return item's value interpreted as a tuple of 'dtype' [strings]."""
value = self.get(item)
# Regex strips bracketing so can't nest, but safer than eval
value = [x for x in re.split('[\s,\(\)\[\]]*', value) if x]
if dtype is not None:
return tuple([dtype(x) for x in value])
else:
return tuple(value)
def _init_paths(self):
"""Find the script directory and set up working directory"""
# Where the script is has the config defaults.
if __name__ != '__main__':
self.script_dir = os.path.dirname(__file__)
else:
self.script_dir = os.path.abspath(sys.path[0])
# Where we run the job.
self.job_dir = os.getcwd()
def _init_logging(self):
"""
Setup the logging to terminal and .flog file, with levels as required.
Must run before any logging calls so we need to access attributes
rather than using self.get()!
"""
# Quiet always overrides verbose; always at least INFO in .flog
if self.options.silent:
stdout_level = logging.CRITICAL
file_level = logging.INFO
elif self.options.quiet:
stdout_level = logging.ERROR
file_level = logging.INFO
elif self.options.verbose:
stdout_level = logging.DEBUG
file_level = logging.DEBUG
else:
stdout_level = logging.INFO
file_level = logging.INFO
# Easier to do simple file configuration then add the stdout
logging.basicConfig(level=file_level,
format='[%(asctime)s] %(levelname)s %(message)s',
datefmt='%Y%m%d %H:%M:%S',
filename=self.job_name + '.flog',
filemode='a')
# Make these uniform widths
logging.addLevelName(10, '--')
logging.addLevelName(20, '>>')
logging.addLevelName(30, '**')
logging.addLevelName(40, '!!')
logging.addLevelName(50, 'XX')
if self.options.plain:
console = logging.StreamHandler(sys.stdout)
else:
# Use nice coloured console output
console = ColouredConsoleHandler(sys.stdout)
console.setLevel(stdout_level)
formatter = logging.Formatter('%(levelname)s %(message)s')
console.setFormatter(formatter)
# add the handler to the root logger
logging.getLogger('').addHandler(console)
def commandline(self):
"""Specified options, highest priority."""
usage = "usage: %prog [options] [COMMAND] JOB_NAME"
# use description for the script, not for this module
parser = OptionParser(usage=usage, version="%prog 0.1",
description=__main__.__doc__)
parser.add_option("-v", "--verbose", action="store_true",
dest="verbose",
help="output extra debugging information")
parser.add_option("-q", "--quiet", action="store_true",
dest="quiet", help="only output warnings and errors")
parser.add_option("-s", "--silent", action="store_true",
dest="silent", help="no terminal output")
parser.add_option("-p", "--plain", action="store_true",
dest="plain", help="do not colourise or wrap output")
parser.add_option("-o", "--option", action="append", dest="cmdopts",
help="set custom options as key=value pairs")
parser.add_option("-i", "--interactive", action="store_true",
dest="interactive", help="enter interactive mode")
parser.add_option("-m", "--import", action="store_true",
dest="import", help="try and import old data")
parser.add_option("-n", "--no-submit", action="store_true",
dest="no_submit",
help="create input files only, do not run any jobs")
parser.add_option("-j", "--job-type", dest="job_type",
help="user preconfigured job settings")
parser.add_option("-d", "--daemon", action="store_true", dest="daemon",
help="run [lube] as a server and await input")
(local_options, local_args) = parser.parse_args()
# job_name may or may not be passed or set initially
if self.job_name:
if self.job_name in local_args:
local_args.remove(self.job_name)
elif len(local_args) == 0:
parser.error("No arguments given (try %prog --help)")
else:
# Take the last argument as the job name
self.job_name = local_args.pop()
# key value options from the command line
if local_options.cmdopts is not None:
for pair in local_options.cmdopts:
if '=' in pair:
pair = pair.split('=', 1) # maximum of one split
self.cmdopts[pair[0]] = pair[1]
else:
self.cmdopts[pair] = True
self.options = local_options
# Args are only the COMMANDS for the run
self.args = [arg.lower() for arg in local_args]
def load_defaults(self):
"""Load program defaults."""
# ConfigParser requires header sections so we add them to a StringIO
# of the file if they are missing. 2to3 should also deal with the
# renamed modules.
default_ini_path = os.path.join(self.script_dir, 'defaults.ini')
try:
filetemp = open(default_ini_path, 'r')
default_ini = filetemp.read()
filetemp.close()
if not '[defaults]' in default_ini.lower():
default_ini = '[defaults]\n' + default_ini
default_ini = StringIO(default_ini)
except IOError:
# file does not exist so we just use a blank string
debug('Default options not found! Something is very wrong.')
default_ini = StringIO('[defaults]\n')
self.defaults.readfp(default_ini)
def load_site_defaults(self):
"""Find where the script is and load defaults"""
site_ini_path = os.path.join(self.script_dir, 'site.ini')
try:
filetemp = open(site_ini_path, 'r')
site_ini = filetemp.read()
filetemp.close()
if not '[site_config]' in site_ini.lower():
site_ini = '[site_config]\n' + site_ini
site_ini = StringIO(site_ini)
except IOError:
# file does not exist so we just use a blank string
debug("No site options found; using defaults")
site_ini = StringIO('[site_config]\n')
self.site_ini.readfp(site_ini)
def load_job_defaults(self):
"""Find where the job is running and load defaults"""
job_ini_path = os.path.join(self.job_dir, self.job_name + '.fap')
try:
filetemp = open(job_ini_path, 'r')
job_ini = filetemp.read()
filetemp.close()
if not '[job_config]' in job_ini.lower():
job_ini = '[job_config]\n' + job_ini
job_ini = StringIO(job_ini)
debug("Job options read from %s" % job_ini_path)
except IOError:
# file does not exist so we just use a blank string
debug("No job options found; using defaults")
job_ini = StringIO('[job_config]\n')
self.job_ini.readfp(job_ini)
def load_job_type(self, job_type):
"""Find where the job is running and load defaults"""
home_dir = os.path.expanduser('~')
job_type_ini_path = os.path.join(home_dir, '.faps', job_type + '.fap')
try:
filetemp = open(job_type_ini_path, 'r')
job_type_ini = filetemp.read()
filetemp.close()
if not '[job_type]' in job_type_ini.lower():
job_type_ini = '[job_type]\n' + job_type_ini
job_type_ini = StringIO(job_type_ini)
debug("Job type options read from %s" % job_type_ini_path)
except IOError:
# file does not exist so we just use a blank string
error("Job type '%s' specified but options file '%s' not found" %
(job_type, job_type_ini_path))
job_type_ini = StringIO('[job_config]\n')
self.job_type_ini.readfp(job_type_ini)
def options_test():
"""Try and read a few options from different sources."""
testopts = Options()
print(testopts.get('job_name'))
print(testopts.get('cmdopts'))
print(testopts.get('args'))
print(testopts.get('verbose'))
print(testopts.get('script_dir'))
print(testopts.getbool('interactive'))
for arg in testopts.get('args'):
print('%s: %s' % (arg, testopts.get(arg)))
try:
print(testopts.getbool(arg))
except ValueError:
print('%s is not a bool' % arg)
try:
print(testopts.getint(arg))
except ValueError:
print('%s is not an int' % arg)
try:
print(testopts.getfloat(arg))
except ValueError:
print('%s is not a float' % arg)
try:
print(testopts.gettuple(arg))
except ValueError:
print('%s is not a tuple' % arg)
print(testopts.get('not an option'))
class ColouredConsoleHandler(logging.StreamHandler):
"""Makes colourised and wrapped output for the console."""
def emit(self, record):
"""Colourise and emit a record."""
# Need to make a actual copy of the record
# to prevent altering the message for other loggers
myrecord = copy.copy(record)
levelno = myrecord.levelno
if levelno >= 50: # CRITICAL / FATAL
front = '\033[30;41m' # black/red
elif levelno >= 40: # ERROR
front = '\033[30;41m' # black/red
elif levelno >= 30: # WARNING
front = '\033[30;43m' # black/yellow
elif levelno >= 20: # INFO
front = '\033[30;42m' # black/green
elif levelno >= 10: # DEBUG
front = '\033[30;46m' # black/cyan
else: # NOTSET and anything else
front = '\033[0m' # normal
myrecord.levelname = '%s%s\033[0m' % (front, myrecord.levelname)
logging.StreamHandler.emit(self, myrecord)
class NullConfigParser(object):
"""Use in place of a blank ConfigParser that has no options."""
def __init__(self, *args, **kwargs):
"""This is empty, so do nothing."""
pass
def has_option(*args, **kwargs):
"""Always return Fasle as there are no options."""
return False
if __name__ == '__main__':
options_test()