-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsystem.py
executable file
·183 lines (151 loc) · 6.15 KB
/
system.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
#!/usr/bin/python
import crypto
import logging
class System:
def __init__(self):
self.db = None
self.crypto = crypto.Crypto()
self.log = logging.getLogger("System")
self.log.setLevel(logging.DEBUG)
def setDB(self, db):
self.db = db
def init(self):
if self.db.getUser("admin") == None:
self.log.warn("Creating admin user with default password")
self.addUser("admin", "admin")
self.grantAdmin("admin")
self.enablePasswordAuth("admin")
self.generateKeysForUser("admin", "password")
return True
def addUser(self, username, displayName):
self.log.info("Creating user " + username)
ret = self.db.addUser(username, displayName)
self.db.sync()
return ret
def listUsers(self):
return self.db.listUsers()
def getUser(self, user):
return self.db.getUser(user)
def setUserDisplayName(self, username, displayName):
self.log.info("Changing display name for " + username)
self.db.updateUserField(username, "displayName", displayName)
self.db.sync()
return True
def grantAdmin(self, username):
self.log.info("Granting admin access to " + username)
self.db.updateUserField(username, "admin", "Y")
self.db.sync()
return True
def revokeAdmin(self, username):
self.log.info("Revoking admin access from " + username)
self.db.updateUserField(username, "admin", "N")
self.db.sync()
return True
def enableUser(self, username):
self.log.info("Enabling user " + username)
self.db.updateUserField(username, "enabled", "Y")
self.db.sync()
return True
def disableUser(self, username):
self.log.info("Disabling user " + username)
self.db.updateUserField(username, "enabled", "N")
self.db.sync()
return True
def enablePasswordAuth(self, username):
self.log.info("Enabling password auth for user " + username)
self.db.updateUserField(username, "passwordAuth", "Y")
self.db.sync()
return True
def disablePasswordAuth(self, username):
self.log.info("Disabling password auth for user " + username)
self.db.updateUserField(username, "passwordAuth", "N")
self.db.sync()
return True
def getUserPublicKey(self, username):
data = self.getUser(username)
if data != None and "publicKey" in data:
return data["publicKey"]
return None
def setUserPublicKey(self, username, pem, keyType):
self.log.info("Changing the public key for " + username)
self.db.updateUserField(username, "publicKey", pem)
self.db.updateUserField(username, "keyType", keyType)
self.db.sync()
return True
def setUserPrivateKey(self, username, key):
self.log.info("Changing the private key for " + username)
self.db.updateUserField(username, "encryptedPrivateKey", key)
self.db.sync()
return True
def generateKeysForUser(self, username, password):
self.log.info("Generating keys for " + username)
key = self.crypto.keyStretchPassword(username, password)
(priv, pub) = self.crypto.generatePublicPrivateKeys()
self.setUserPublicKey(username, pub, "RSA")
encryptedPriv = self.crypto.encrypt(key, priv)
# print("new priv key="+str(encryptedPriv))
self.setUserPrivateKey(username, encryptedPriv)
self.db.sync()
return True
def getUserPrivateKey(self, username, password):
self.log.warning("Retriving private key for " + username)
key = self.crypto.keyStretchPassword(username, password)
encryptedPriv = self.db.getUser(username)["encryptedPrivateKey"]
# print("epriv="+str(encryptedPriv))
priv = self.crypto.decrypt(key, encryptedPriv)
return priv
def clearUserPrivateKey(self, username):
self.log.info("Removing private key for " + username)
self.db.removeUserField(username, "encryptedPrivateKey")
self.db.sync()
return True
def addSecret(
self, owner, secretEncryptionProfile, encryptedKey, encryptedSecret, hmac
):
self.log.info("Adding secret for " + owner + ", hmac=" + hmac)
sid = self.db.addSecret(
owner, secretEncryptionProfile, encryptedKey, encryptedSecret, hmac
)
self.db.sync()
return sid
def updateSecret(self, sid, encryptedSecret, hmac):
self.log.info("Updating secret " + sid)
self.db.updateSecret(sid, encryptedSecret, hmac)
self.db.sync()
return True
def shareSecret(self, sid, user, encryptedKey):
self.log.info("Sharing secret " + sid + " to " + user)
self.db.shareSecret(sid, user, encryptedKey)
self.db.sync()
return True
def unshareSecret(self, sid, user):
self.log.info("Revoking access to secret " + sid + " from " + user)
self.db.unshareSecret(sid, user)
self.db.sync()
def getSecret(self, sid):
return self.db.getSecret(sid)
def getSecretsForUser(self, user):
return self.db.getSecretsForUser(user)
def doesUserHaveReadAccess(self, username, sid):
entry = self.getSecret(sid)
if username in entry["users"]:
return True
return False
def doesUserHaveWriteAccess(self, username, sid):
entry = self.getSecret(sid)
if username in entry["users"]:
userEntry = entry["users"][username]
return "canWrite" in userEntry and "Y" == userEntry["canWrite"]
return False
def doesUserHaveShareAccess(self, username, sid):
entry = self.getSecret(sid)
if username in entry["users"]:
userEntry = entry["users"][username]
return "canShare" in userEntry and "Y" == userEntry["canShare"]
return False
def doesUserHaveUnshareAccess(self, username, sid):
entry = self.getSecret(sid)
if username in entry["users"]:
userEntry = entry["users"][username]
return "canUnshare" in userEntry and "Y" == userEntry["canUnshare"]
return False