-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathauth_remote_user
executable file
·230 lines (186 loc) · 7.48 KB
/
auth_remote_user
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
#!/usr/bin/env python3
'''
Ext filter that will validate POST requests and make sure REMOTE_USER is used
as username. This enables to use Saltstack's sharedsecret auth module and we
can use technologies as Kerberos GSSAPI to authenticate an API user to
Saltstack. For more information, visit:
https://github.com/stockholmuniversity/salt-netapi-remoteuser-auth
'''
import argparse
import json
import os
import re
import sys
import unittest
import urllib.parse
# pylint: disable=import-error
import yaml
# pylint: disable=unused-import
from su.logging import logging, structured # noqa: F401
def update_indata_dict(indata_dict, remote_user, password, eauth):
''' Always update username to REMOTE_USER. This will prevent a user to
tamper with the username parameter.
'''
indata_dict.update(username=remote_user)
indata_dict.update(password=password)
indata_dict.update(eauth=eauth)
def update_indata_lowstate(indata, remote_user, password, eauth):
''' When using JSON for the lowstate you can send multiple lowstates as a
list, so we need to handle that.
'''
if isinstance(indata, list):
for lowstate in indata:
update_indata_dict(lowstate, remote_user, password, eauth)
else:
update_indata_dict(indata, remote_user, password, eauth)
def main(stdin):
''' Take data on standard in and make sure we are setting username to
REMOTE_USER.
'''
logger = logging.getLogger("auth_remote_user")
logger.setLevel(logging.INFO)
remote_user = os.getenv("REMOTE_USER")
if remote_user is None:
logger.error("REMOTE_USER isn't set, check your configuration.")
sys.exit(1)
remote_user = authz_regex(username=remote_user)
password = None
eauth = os.environ.get('EAUTH', 'sharedsecret')
if eauth == 'sharedsecret':
shared_secret_config = os.environ.get('SHARED_SECRET_CONFIG')
if shared_secret_config is None:
logger.error(
"Can't find required environment variables: {env}".format(
env={
"remote_user": remote_user,
"shared_secret_config": shared_secret_config,
}))
sys.exit(1)
yaml_data = {}
try:
with open(shared_secret_config, 'r') as stream:
yaml_data = yaml.safe_load(stream)
# pylint: disable=invalid-name
except (yaml.YAMLError, FileNotFoundError) as e:
logger.exception(e)
sys.exit(1)
password = yaml_data.get('sharedsecret')
if password is None:
logger.error("Can't find sharedsecret in config {}".format(
shared_secret_config))
sys.exit(1)
indata = stdin.read()
indata_dict = {}
request_parameters = ""
if os.environ.get('CONTENT_TYPE') == 'application/x-www-form-urlencoded':
logger.debug("Handling a {} request".format(
os.environ.get('CONTENT_TYPE')))
indata_dict = urllib.parse.parse_qs(indata)
update_indata_lowstate(indata_dict, remote_user, password, eauth)
request_parameters = urllib.parse.urlencode(indata_dict, doseq=True)
elif os.environ.get('CONTENT_TYPE') == 'application/json':
logger.debug("Handling a {} request".format(
os.environ.get('CONTENT_TYPE')))
indata_dict = None
try:
indata_dict = json.loads(indata)
# pylint: disable=invalid-name
except json.decoder.JSONDecodeError as e:
logger.exception(e)
sys.exit(1)
update_indata_lowstate(indata_dict, remote_user, password, eauth)
request_parameters = json.dumps(indata_dict)
else:
logger.error("Unknown request with content_type={}".format(
os.environ.get('CONTENT_TYPE')))
sys.exit(1)
print(request_parameters)
def authz_regex(username=os.getenv("REMOTE_USER"),
pattern=os.getenv("REMOTE_USER_PATTERN", "")):
"""
Authorizes a username based on a regex.
Also provides the option to modify the username via a capture group. The
first group is used as the username.
"""
if username is None:
return None
match = re.match(pattern, username)
# The regex matched grouped or not
if match is not None:
matched_username = None
try:
matched_username = match.group(1)
except IndexError:
return username
return matched_username
return None
class SaltAuthZTest(unittest.TestCase):
# pylint: disable=missing-docstring,anomalous-backslash-in-string
remote_user_pattern = '^(.+?)\/root@SU\.SE$' # noqa: W605
def test_root(self):
self.assertEqual(
authz_regex(username="simlu/[email protected]",
pattern=self.remote_user_pattern), "simlu")
def test_hackerman(self):
self.assertEqual(
authz_regex(username="/[email protected]",
pattern=self.remote_user_pattern), None)
def test_a(self):
self.assertEqual(
authz_regex(username="a/[email protected]",
pattern=self.remote_user_pattern), "a")
def test_admin(self):
self.assertEqual(
authz_regex(username="simlu/[email protected]",
pattern=self.remote_user_pattern), None)
def test_regular(self):
self.assertEqual(
authz_regex(username="[email protected]",
pattern=self.remote_user_pattern), None)
def test_regular_no_realm(self):
self.assertEqual(
authz_regex(username="simlu", pattern=self.remote_user_pattern),
None)
def test_regular_no_pattern(self):
self.assertEqual(authz_regex(username="[email protected]"), "[email protected]")
def test_realm_pattern_allowed(self):
self.assertEqual(
authz_regex(username="[email protected]", pattern=r".*@SU\.SE$"),
def test_realm_pattern_denied(self):
self.assertEqual(
authz_regex(username="[email protected]", pattern=r".*@SU\.SE$"), None)
def test_nada(self):
self.assertEqual(authz_regex(), None)
def test_none(self):
self.assertEqual(authz_regex(username=None), None)
def test_none_everything(self):
self.assertEqual(authz_regex(username=None, pattern=None), None)
class SaltAuthZMultiUserTest(unittest.TestCase):
# pylint: disable=missing-docstring,anomalous-backslash-in-string
remote_user_pattern = r'^(su-.+?|.+?(?=\/root))(?:\/root)?@SU\.SE$'
def test_regular(self):
self.assertEqual(
authz_regex(username="[email protected]",
pattern=self.remote_user_pattern), None)
def test_serviceaccount(self):
self.assertEqual(
authz_regex(username="[email protected]",
pattern=self.remote_user_pattern), 'su-ci-prod')
def test_root(self):
self.assertEqual(
authz_regex(username="simlu/[email protected]",
pattern=self.remote_user_pattern), 'simlu')
def test_ppp(self):
self.assertEqual(
authz_regex(username="simlu/[email protected]",
pattern=self.remote_user_pattern), None)
if __name__ == "__main__":
PARSER = argparse.ArgumentParser(
description='Authenticate REMOTE_USER for Salt NetAPI')
PARSER.add_argument('--test', action='store_true', help='Run unit tests')
ARGS = PARSER.parse_args()
if ARGS.test:
unittest.main(argv=[sys.argv[0]])
else:
main(sys.stdin)