-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathpatch-nspawn-ids
executable file
·117 lines (96 loc) · 4.55 KB
/
patch-nspawn-ids
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#!/usr/bin/env python3
import itertools as it, operator as op, functools as ft
import os, sys, stat, pathlib, contextlib, logging
class ContainerIDMismatch(Exception):
def __init__(self, p, cid_set, cid2):
self.p, self.cid_set, self.cid2 = p, cid_set, cid2
def __str__(self):
cid_set = ', '.join(f'{cid:x}' for cid in sorted(self.cid_set))
return f'{self.p} :: {self.cid2:x} not in [{cid_set}]'
def assert_cid_match(p, cid_or_set, n=None, s=None):
if s is not None:
assert_cid_match(p, cid_or_set, s.st_uid)
assert_cid_match(p, cid_or_set, s.st_gid)
return
cid2 = id_cid(n)
if isinstance(cid_or_set, int): chk = cid_or_set == cid2
else: chk = cid2 in cid_or_set
if not chk: raise ContainerIDMismatch(p, cid_or_set, cid2)
def id_cid(n): return n & 0xffff0000
def id_base(n): return n & 0xffff
def patch(cid, n): return id_base(n) | cid
def id_repr(uid_gid, gid=None, as_hex=False):
if gid is None: uid, gid = uid_gid
else: uid, gid = uid_gid, gid
tpl = '{}' if not as_hex else '{:x}'
return f'{tpl}{{}}'.format(uid, '' if uid == gid else f':{tpl}'.format(gid))
def shift_path_ids(path, cid, old_cid_whitelist=None, dry_run=False):
log.debug( 'Patch-lines format: <path> :: <src-uid/gid-dec>'
' [<src-uid/gid-hex>-<src-uid/gid-base>] -> <src-uid/gid-dec> [<src-uid/gid-hex>]' )
cid_patch = ft.partial(patch, cid)
def shift_node(p):
s = p.lstat()
if old_cid_whitelist: assert_cid_match(p, old_cid_whitelist, s=s)
dst_uid, dst_gid = map(cid_patch, [s.st_uid, s.st_gid])
if dst_uid == s.st_uid and dst_gid == s.st_gid: return
if log.isEnabledFor(logging.DEBUG):
log.debug('patch: {} :: {} [{}-{}] -> {} [{:x}]'.format( p,
id_repr(s.st_uid, s.st_gid), id_repr(map(id_cid, [s.st_uid, s.st_gid]), as_hex=True),
id_repr(map(id_base, [s.st_uid, s.st_gid])), id_repr(dst_uid, dst_gid), cid>>16 ))
if not dry_run:
os.chown(p, dst_uid, dst_gid, follow_symlinks=False)
# Linux can drop stuff like suid/sgid bits after chown, so restore these
if not stat.S_ISLNK(s.st_mode):
os.chmod(p, stat.S_IMODE(s.st_mode))
shift_node(path)
for top, dirs, files in os.walk(path):
p = pathlib.Path(top)
for fn in dirs: shift_node(p / fn)
for fn in files: shift_node(p / fn)
def main(args=None):
import argparse
parser = argparse.ArgumentParser(
description='Tool to shift ("patch") uid/gid values recursively for a path.'
' Does same thing that nspawn-patch-uid.c'
' does in systemd, except without ACLs and standalone.'
' Shift/patch is NOT simply adding some value to all uid/gids, resulting'
' schema is "upper 16bit of a UID identify container, lower 16bit are UID within".'
' Unlike systemd-nspawn, works in a top-down (breadth-first) fashion.')
parser.add_argument('path', nargs='+',
help='Path(s) to patch uid/gid values in.')
parser.add_argument('-r', '--reference', metavar='path',
help='Reference path to e.g. container root path,'
' which has container-mapped root user uid/gid set on it.')
parser.add_argument('-c', '--container-id', metavar='int', type=int,
help='16-bit container id value, to be set in upper 16 bits of each uid/gid.')
parser.add_argument('-s', '--strict', action='store_true',
help='Abort operation if upper 16 bits of uid/gid'
' have different value from that of top-level path.'
' Can be used to make sure that container paths are not "merged" and that no uids'
' on pre-container fs go beyond 16 bits (and hence are incompatible with new schema).')
parser.add_argument('-n', '--dry-run', action='store_true',
help='Do not change anything on fs, only navigate over it and run all the checks.')
parser.add_argument('-d', '--debug', action='store_true', help='Verbose operation mode.')
opts = parser.parse_args(sys.argv[1:] if args is None else args)
global log
logging.basicConfig(level=logging.DEBUG if opts.debug else logging.WARNING)
log = logging.getLogger('main')
if (opts.container_id is not None) ^ bool(opts.reference) != 1:
parser.error('Either --container-id or --reference must be specified, but not both.')
cid = opts.container_id
if cid is None:
p = pathlib.Path(opts.reference)
s = p.lstat()
cid = id_cid(s.st_uid)
assert_cid_match(p, cid, s.st_gid)
log.debug(f'Reference cid-hex={cid:x} (from uid-hex={s.st_uid:x})')
paths = list(pathlib.Path(p).resolve() for p in opts.path)
for p in paths:
cid_whitelist = None
if opts.strict:
s = p.lstat()
cid_chk = id_cid(s.st_uid)
assert_cid_match(p, cid_chk, s.st_gid)
cid_whitelist = [cid_chk]
shift_path_ids(p, cid, old_cid_whitelist=cid_whitelist, dry_run=opts.dry_run)
if __name__ == '__main__': sys.exit(main())