-
Notifications
You must be signed in to change notification settings - Fork 34
/
systemd-cglog
executable file
·201 lines (177 loc) · 8.01 KB
/
systemd-cglog
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
#!/usr/bin/env python
import subprocess as sp, operator as op, collections as cs, pathlib as pl
import os, sys, re, io, time, fnmatch, json, signal, select, logging, logging.handlers
import systemd.journal as sdj # official systemd python bindings package
log = logging.getLogger('cglog')
class adict(dict):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__dict__ = self
def size_parse(size):
if not size or not isinstance(size, str): return size
if size[-1].isdigit(): return float(size)
for u, u1 in reversed(list((u, 2 ** (n * 10)) for n, u in enumerate('BKMGT'))):
if size[-1] == u: break
else: raise ValueError('Unrecognized units: {} (value: {!r})'.format(size[-1], size))
return float(size[:-1]) * u1
def journal_unit_tracker_iter(unit_pat, ev_map, unit_set=set()):
'''Expects ts_max inputs for wait() delays,
and yields same set of currently-active units matching wildcard.'''
with sdj.Reader() as jj:
jj.seek_tail()
jj.get_previous()
jjp, jjp_errs = select.poll(), 0
for e in 'ERR HUP RDHUP NVAL'.split(): jjp_errs |= getattr(select, f'POLL{e}')
jjp.register(jj, jj.get_events())
for msg_id in ev_map:
jj.add_match(f'MESSAGE_ID={msg_id.replace("-", "")}')
jj.add_disjunction()
while True:
ts_max = yield unit_set
if not ts_max: break
while (delay := ts_max - time.monotonic()) > 0:
if not (e := jjp.poll(delay * 1000)): continue # timeout
if any(e[1] & jjp_errs for e in e): raise RuntimeError('sd-journal fd poll failed')
if jj.process() != sdj.APPEND: continue
for e in jj:
if ( ('systemd', 1, 'init.scope') !=
tuple(e.get(k) for k in ['_COMM', '_PID', '_SYSTEMD_UNIT']) ): continue
if not (ev := ev_map.get(str(e.get('MESSAGE_ID', '')).replace('-', ''))): continue
if not fnmatch.fnmatch(unit := e.get('UNIT', ''), unit_pat): continue
if ev.start: unit_set.add(unit)
if ev.stop: unit_set.discard(unit)
break # to log start/stop immediately
def poll_cg_stats(cg, dev_cache=dict()):
stats, errs = dict(), 0
for k in 'cpu', 'mem':
if not (src := cg.get(k)): continue
src.seek(0); s = src.read()
for line in s.splitlines():
try:
lk, lv = line.decode().split(None, 1)
stats[f'{k}.{lk}'] = int(lv)
except: errs += 1
if src := cg.get('io'):
src.seek(0); s = src.read()
for line in s.splitlines():
try:
dev, lvs = line.decode().split(None, 1)
if dev in dev_cache: dev = dev_cache[dev]
else:
try: dev = pl.Path(f'/dev/block/{dev}').resolve(True).name
except OSError: pass
dev_cache[dev] = dev
for lv in lvs.split():
lk, lv = lv.split('=', 1)
stats[f'io.{dev}.{lk}'] = int(lv)
except: errs += 1
if errs > 2: # either random or parser bugs
cg_files = ' '.join(f'{k}={src.name}' for k, src in cg.items())
log.warning('Too many errors [%s] when parsing cg stats: %s', errs, cg_files)
return stats
def main(args=None):
ev_log_bs, ev_log_n = '3M', 3
import argparse, textwrap
dd = lambda text: re.sub( r' \t+', ' ',
textwrap.dedent(text).strip('\n') + '\n' ).replace('\t', ' ')
parser = argparse.ArgumentParser(
usage='%(prog)s [opts] log-file unit-fnmatch',
formatter_class=argparse.RawTextHelpFormatter,
description='Log cgroup stats and systemd events for specified unit files until stopped.')
parser.add_argument('log_file',
metavar=f'log_file[:bytes={ev_log_bs}[:backups={ev_log_n}]]',
help=dd(f'''
File to use for json data/events with an optional
[:bytes[:backups]] suffix (default :{ev_log_bs}:{ev_log_n}).
Can be e.g. %%3 to output to file descriptor 3, or '-' for stdout.
Won't be rotated if dev, fd or non-seekable, resolved to realpath if symlink.'''))
parser.add_argument('unit_fnmatch', help='fnmatch pattern for systemd unit name(s) to monitor.')
parser.add_argument('-i', '--poll-interval',
type=float, metavar='seconds', default=3.0,
help='Interval between polling datapoints from unit(s) cgroup(s). Default: %(default).1fs')
parser.add_argument('-n', '--new', action='store_true',
help='Only monitor unit(s) that get started after this script, not pre-exising ones.')
parser.add_argument('-s', '--stop', action='store_true',
help=dd('''
Stop after all monitored units exit.
Can be used with --new to log data for a single unit start/stop lifecycle.'''))
opts = parser.parse_args(args if args is not None else sys.argv[1:])
logging.basicConfig(level=logging.WARNING, format='{levelname} :: {message}', style="{")
ev_log_file = opts.log_file
if ':' in ev_log_file:
ev_log_file, ev_log_bs = ev_log_file.split(':', 1)
if ':' in ev_log_bs: ev_log_bs, ev_log_n = ev_log_bs.split(':', 1)
ev_log_bs, ev_log_n = size_parse(ev_log_bs), int(ev_log_n)
ev_handler = None
if ev_log_file == '-': ev_handler = logging.StreamHandler(sys.stdout)
elif ev_log_file.startswith('%') and (fd := ev_log_file[1:]).isdigit():
ev_handler = logging.StreamHandler(open(int(fd), 'a'))
elif (p := pl.Path(ev_log_file)).exists() and not p.is_file():
ev_handler = logging.FileHandler(ev_log_file)
elif p.exists() and p.is_symlink(): p = p.resolve(True)
if not ev_handler:
try:
with p.open('a') as dst: dst.tell()
except io.UnsupportedOperation:
log.warning( 'Specified log file path is'
' not seekable, disabling rotation: %s', p )
ev_handler = logging.FileHandler(p)
else:
ev_handler = logging.handlers.RotatingFileHandler(
p, maxBytes=ev_log_bs, backupCount=ev_log_n )
ev_log = logging.getLogger('cglog.ev')
ev_handler.setLevel(logging.DEBUG)
ev_handler.setFormatter(logging.Formatter('%(message)s'))
ev_log.addHandler(ev_handler)
ev_log.setLevel(logging.DEBUG)
ev_log.propagate = False
ev_log = lambda log=ev_log,**d: log.info('%s', json.dumps(dict(ts=time.time(), **d)))
if new := opts.new: unit_set = set()
else:
unit_st = op.itemgetter('load', 'active', 'sub')
unit_set = set( u['unit']
for u in json.loads(sp.run(
['systemctl', 'list-units', '-ao', 'json'],
check=True, stdout=sp.PIPE ).stdout)
if unit_st(u)[:2] == ('loaded', 'active') and
fnmatch.fnmatch(u['unit'], opts.unit_fnmatch) )
for sig in signal.SIGINT, signal.SIGTERM:
signal.signal(sig, lambda sig,frm: sys.exit())
tracker_ev_map = { # not sure if these stay same between releases, check journalctl
'39f53479d3a045ac8e11786248231fbf': adict(start=1, stop=0),
'7ad2d189f7e94e70a38c781354912448': adict(start=0, stop=1),
'9d1aaa27d60140bd96365438aad20286': adict(start=0, stop=1) }
tracker = journal_unit_tracker_iter(opts.unit_fnmatch, tracker_ev_map, unit_set)
next(tracker)
unit_cg_errs, unit_cg_errs_warn = cs.Counter(), 5
unit_cg_map, cg_stats = dict(), dict(cpu='cpu.stat', mem='memory.stat', io='io.stat')
ts_next = time.monotonic()
while True:
unit_set_last = set(unit_set)
ts = time.monotonic()
while ts_next < ts: ts_next += opts.poll_interval
tracker.send(ts_next)
for u in set(unit_cg_map).difference(unit_set): del unit_cg_map[u] # stopped
for u in unit_set.difference(unit_cg_map): # new/started
cmd = sp.run(['systemctl', 'show', '-P', 'ControlGroup', u], stdout=sp.PIPE)
if cmd.returncode: unit_cg_errs[u] += 1; continue
p = pl.Path(f'/sys/fs/cgroup/{cmd.stdout.decode().strip()}')
cg = unit_cg_map[u] = dict()
for k, stat in cg_stats.items():
try: cg[k] = (p / stat).open('rb')
except OSError: pass # missing/disabled rc
for u in set(unit_cg_map).intersection(unit_cg_errs)\
.union(set(unit_cg_errs).difference(unit_set)): del unit_cg_errs[u] # old err-counters
for u, n in unit_cg_errs.most_common(1): # error count overflow
if n < unit_cg_errs_warn: continue
log.warning('Failed to get cgroup for unit after %s iterations: %s', n, u)
unit_set.discard(u); del unit_cg_errs[u]
dev_cache = dict()
for u in unit_set.difference(unit_set_last): ev_log(ev='start', u=u)
for u in unit_set_last.difference(unit_set): ev_log(ev='stop', u=u)
for u, cg in list(unit_cg_map.items()):
try: ev_log(ev='stat', u=u, **poll_cg_stats(cg, dev_cache))
except OSError: del unit_cg_map[u]; unit_cg_errs[u] += 1
if unit_set: new = False
elif not new and opts.stop: break
if __name__ == '__main__': sys.exit(main())