-
Notifications
You must be signed in to change notification settings - Fork 34
/
pacman-fsck
executable file
·146 lines (129 loc) · 6.39 KB
/
pacman-fsck
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/env python
import subprocess as sp, pathlib as pl, hashlib as hl
import os, sys, enum, re, codecs
err_fmt = lambda err: '[{}] {}'.format(err.__class__.__name__, err)
p_err = lambda fmt,*a,prefix='ERROR: ',**k: print(
prefix + (fmt.format(*a, **k) if a or k else fmt), file=sys.stderr )
class adict(dict):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__dict__ = self
def str_decode_escapes( s,
_re_esc=re.compile(r'''
( \\U........ # 8-digit hex escapes
| \\u.... # 4-digit hex escapes
| \\N\{[^}]+\} # Unicode characters by name
| \\[\\'"abfnrtv] # Single-character escapes
)''', re.VERBOSE),
_dec_esc=lambda m: codecs.decode(m.group(0), 'unicode-escape'),
_re_oct=re.compile(r'\\[0-7]{1,3}'),
_dec_oct=lambda m: rf'\x{int(m.group(0)[1:], 8):x}',
_re_hex=re.compile(br'\\x[\da-fA-F]{2}'),
_dec_hex=lambda m: int(m.group(0)[2:], 16).to_bytes(1, 'big') ):
s = _re_esc.sub(_dec_esc, s) # unicode escapes
s = _re_oct.sub(_dec_oct, s) # octal -> hex escapes
return _re_hex.sub(_dec_hex, s.encode()).decode() # hex escapes -> utf-8
errs = enum.Enum('errs', 'nx access nohash parse')
class SkipCheck(Exception): pass
def validate_mtree_files(p_err, p_mtree, root, skip_res, bs=1 * 2**20):
mtree = sp.run( ['bsdcat', p_mtree.resolve()],
stdout=sp.PIPE ).stdout.decode(errors='replace')
for line in mtree.splitlines():
if not (line := line.strip()): continue
if line == '#mtree' or line.startswith('./.') or line.startswith('/set '): continue
try:
p, attrs = line.split(None, 1)
p_chk = '/' + (p := str_decode_escapes(p).lstrip('./'))
for rx in skip_res:
if rx.search(p_chk): raise SkipCheck
p, attrs = root / p, adict(a.split('=', 1) for a in attrs.split())
except SkipCheck: continue
except Exception as err:
p_err(errs.parse, 'Unrecognized mtree line [ {} ]: {!r} - {}', p_mtree, line, err_fmt(err))
continue
if attrs.get('type') is not None: continue # regular files don't have that attr
for k in 'sha512', 'sha256', 'md5':
if not (digest_chk := attrs.get(f'{k}digest')): continue
src, digest, digest_chk = None, getattr(hl, k)(), digest_chk.lower()
try:
src = open(p, 'rb')
os.posix_fadvise(src.fileno(), 0, 0, os.POSIX_FADV_SEQUENTIAL)
os.posix_fadvise(src.fileno(), 0, 0, os.POSIX_FADV_NOREUSE)
while True:
buff = src.read(bs)
if not buff: break
digest.update(buff)
if digest.hexdigest().lower() != digest_chk:
p_err(None, 'pkg/mtree file digest mismatch [ {} ]: {}', p_mtree, p)
except FileNotFoundError:
p_err(errs.nx, 'Missing pkg/mtree file [ {} ]: {}', p_mtree, p)
except OSError as err:
p_err( errs.access,
'Failed to access pkg/mtree file [ {} ]: {} - {}', p_mtree, p, err_fmt(err) )
finally:
if src:
os.posix_fadvise(src.fileno(), 0, 0, os.POSIX_FADV_DONTNEED)
src = src.close()
break
else: p_err(errs.nohash, 'No known digests on mtree file-line [ {} ]: {}', p_mtree, line)
def main(args=None):
import argparse
parser = argparse.ArgumentParser(
description='Validate checksums of rootfs files against mtree files'
' in pacman-local-repo dir, reporting any detected mismatches.')
parser.add_argument('-r', '--root',
metavar='dir', default='/', help='Rootfs dir to check. Default: %(default)s')
parser.add_argument('-p', '--pacman-path',
metavar='dir', default='/var/lib/pacman/local',
help='Pacman "local" repo dir, specific pkg'
' subdir there, or path to an mtree file. Default: %(default)s')
parser.add_argument('-s', '--skip-err', action='append', metavar='err[ err...]',
help='Space-separated list of detected error types to skip.'
' Recognized error types: nx (missing file), access (failed to access file),'
' nohash (no known digests), parse (any kind of mtree read/parse error).'
' Option can be specified multiple times, all specified errors will be skipped.')
parser.add_argument('-x', '--skip-prefix', action='append', metavar='prefix[ prefix...]',
help='Space-separated list of pkg/mtree path prefixes to skip checking.'
' Prefix should not include path specified by -r/--root option.'
' For example, "/etc/" can be specified to skip checking all config files there.'
' Option can be specified multiple times, all specified prefixes will be skipped.'
' Don\'t forget to include trailing slash to match prefix-dirs only.')
parser.add_argument('-X', '--skip-re', action='append', metavar='regexp',
help='Filename match regexp (python re syntyax), same to -x, but more powerful.'
' Filenames do not include -r/--root path and always start with "/".'
' Multiple regexps can be specified via multiple uses of this option.')
parser.add_argument('-v', '--verbose', action='store_true',
help='Print alpha-sorted list of mtree files as they get processed'
' with sequence number to stdout, to get an idea on operation progress.')
parser.add_argument('-P', '--err-prefix', metavar='prefix',
help='Additional prefix to prepend to all error msgs to mark/identify specific run output.')
opts = parser.parse_args(sys.argv[1:] if args is None else args)
exit_code = 0
try: err_skip = set(errs[k] for k in ' '.join(opts.skip_err or list()).lower().split())
except KeyError as err:
parser.error(f'Unrecognized error type specified for -s/--skip-err option: {err}')
p_err_prefix = ( dict() if not opts.err_prefix
else dict(prefix=f'ERROR: {opts.err_prefix} :: ') )
def p_err_filter(t, fmt, *fmt_args, **fmt_kws):
nonlocal exit_code
if t not in err_skip:
exit_code = 37
p_err(fmt, *fmt_args, **fmt_kws, **p_err_prefix)
skip_res = set(re.compile(s) for s in ' '.join(opts.skip_re or list()).split())
for pre in ' '.join(opts.skip_prefix or list()).split():
skip_res.add(re.compile(r'^/' + re.escape(pre.lstrip('/'))))
root, mtrees = pl.Path(opts.root), pl.Path(opts.pacman_path)
if mtrees.is_file(): mtrees = [mtrees]
elif (mtrees / 'mtree').exists(): mtrees = [mtrees / 'mtree']
else: mtrees = sorted(mtrees.glob('*/mtree'))
progress_fmt = ( '' if not opts.verbose else
f'[{{:{len(str(len(mtrees)))},d}} / {len(mtrees)}] {{}}' )
for n, p_mtree in enumerate(mtrees, 1):
if progress_fmt: print(progress_fmt.format(n, p_mtree))
try: validate_mtree_files(p_err_filter, p_mtree, root, skip_res)
except Exception as err:
p_err_filter( errs.parse,
'Failed to read/validate mtree file [ {} ]: {}', p_mtree, err_fmt(err) )
continue
return exit_code
if __name__ == '__main__': sys.exit(main())