-
Notifications
You must be signed in to change notification settings - Fork 81
/
Copy pathgettgtpkinit.py
349 lines (296 loc) · 14.5 KB
/
gettgtpkinit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
#!/usr/bin/env python3
#
# Based on examples from minikerberos by skelsec
# Parts of this code was inspired by the following project by @rubin_mor
# https://github.com/morRubin/AzureADJoinedMachinePTC
#
# Author:
# Tamas Jos (@skelsec)
# Dirk-jan Mollema (@_dirkjan)
#
import argparse
import logging
import binascii
import secrets
import datetime
import hashlib
import base64
from oscrypto.keys import parse_pkcs12, parse_certificate, parse_private
from oscrypto.asymmetric import rsa_pkcs1v15_sign, load_private_key
from asn1crypto import cms
from asn1crypto import algos
from asn1crypto import core
from asn1crypto import keys
from minikerberos import logger
from minikerberos.pkinit import PKINIT, DirtyDH
from minikerberos.common.ccache import CCACHE
from minikerberos.common.target import KerberosTarget
from minikerberos.network.clientsocket import KerberosClientSocket
from minikerberos.protocol.constants import NAME_TYPE, PaDataType
from minikerberos.protocol.encryption import Enctype, _checksum_table, _enctype_table, Key
from minikerberos.protocol.structures import AuthenticatorChecksum
from minikerberos.protocol.asn1_structs import KDC_REQ_BODY, PrincipalName, HostAddress, \
KDCOptions, EncASRepPart, AP_REQ, AuthorizationData, Checksum, krb5_pvno, Realm, \
EncryptionKey, Authenticator, Ticket, APOptions, EncryptedData, AS_REQ, AP_REP, PADATA_TYPE, \
PA_PAC_REQUEST
from minikerberos.protocol.rfc4556 import PKAuthenticator, AuthPack, Dunno2, MetaData, Info, CertIssuer, CertIssuers, PA_PK_AS_REP, KDCDHKeyInfo, PA_PK_AS_REQ
class myPKINIT(PKINIT):
"""
Copy of minikerberos PKINIT
With some changes where it differs from PKINIT used in NegoEx
"""
@staticmethod
def from_pfx(pfxfile, pfxpass, dh_params = None):
with open(pfxfile, 'rb') as f:
pfxdata = f.read()
return myPKINIT.from_pfx_data(pfxdata, pfxpass, dh_params)
@staticmethod
def from_pfx_data(pfxdata, pfxpass, dh_params = None):
pkinit = myPKINIT()
# oscrypto does not seem to support pfx without password, so convert it to PEM using cryptography instead
if not pfxpass:
from cryptography.hazmat.primitives.serialization import pkcs12
from cryptography.hazmat.primitives import serialization
privkey, cert, extra_certs = pkcs12.load_key_and_certificates(pfxdata, None)
pem_key = privkey.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
pkinit.privkey = load_private_key(parse_private(pem_key))
pem_cert = cert.public_bytes(
encoding=serialization.Encoding.PEM
)
pkinit.certificate = parse_certificate(pem_cert)
else:
#print('Loading pfx12')
if isinstance(pfxpass, str):
pfxpass = pfxpass.encode()
pkinit.privkeyinfo, pkinit.certificate, pkinit.extra_certs = parse_pkcs12(pfxdata, password=pfxpass)
pkinit.privkey = load_private_key(pkinit.privkeyinfo)
#print('pfx12 loaded!')
pkinit.setup(dh_params = dh_params)
return pkinit
@staticmethod
def from_pem(certfile, privkeyfile, dh_params = None):
pkinit = myPKINIT()
with open(certfile, 'rb') as f:
pkinit.certificate = parse_certificate(f.read())
with open(privkeyfile, 'rb') as f:
pkinit.privkey = load_private_key(parse_private(f.read()))
pkinit.setup(dh_params = dh_params)
return pkinit
def sign_authpack(self, data, wrap_signed = False):
return self.sign_authpack_native(data, wrap_signed)
def setup(self, dh_params = None):
self.issuer = self.certificate.issuer.native['common_name']
if dh_params is None:
print('Generating DH params...')
# self.diffie = DirtyDH.from_dict()
print('DH params generated.')
else:
#print('Loading default DH params...')
if isinstance(dh_params, dict):
self.diffie = DirtyDH.from_dict(dh_params)
elif isinstance(dh_params, bytes):
self.diffie = DirtyDH.from_asn1(dh_params)
elif isinstance(dh_params, DirtyDH):
self.diffie = dh_params
else:
raise Exception('DH params must be either a bytearray or a dict')
def build_asreq(self, domain = None, cname = None, kdcopts = ['forwardable','renewable','renewable-ok']):
if isinstance(kdcopts, list):
kdcopts = set(kdcopts)
if cname is not None:
if isinstance(cname, str):
cname = [cname]
else:
cname = [self.cname]
# if target is not None:
# if isinstance(target, str):
# target = [target]
# else:
# target = ['127.0.0.1']
now = datetime.datetime.now(datetime.timezone.utc)
kdc_req_body_data = {}
kdc_req_body_data['kdc-options'] = KDCOptions(kdcopts)
kdc_req_body_data['cname'] = PrincipalName({'name-type': NAME_TYPE.PRINCIPAL.value, 'name-string': cname})
kdc_req_body_data['realm'] = domain.upper()
kdc_req_body_data['sname'] = PrincipalName({'name-type': NAME_TYPE.SRV_INST.value, 'name-string': ['krbtgt', domain.upper()]})
kdc_req_body_data['till'] = (now + datetime.timedelta(days=1)).replace(microsecond=0)
kdc_req_body_data['rtime'] = (now + datetime.timedelta(days=1)).replace(microsecond=0)
kdc_req_body_data['nonce'] = secrets.randbits(31)
kdc_req_body_data['etype'] = [18,17] # 23 breaks...
# kdc_req_body_data['addresses'] = [HostAddress({'addr-type': 20, 'address': b'127.0.0.1'})] # not sure if this is needed
kdc_req_body = KDC_REQ_BODY(kdc_req_body_data)
checksum = hashlib.sha1(kdc_req_body.dump()).digest()
authenticator = {}
authenticator['cusec'] = now.microsecond
authenticator['ctime'] = now.replace(microsecond=0)
authenticator['nonce'] = secrets.randbits(31)
authenticator['paChecksum'] = checksum
dp = {}
dp['p'] = self.diffie.p
dp['g'] = self.diffie.g
dp['q'] = 0 # mandatory parameter, but it is not needed
pka = {}
pka['algorithm'] = '1.2.840.10046.2.1'
pka['parameters'] = keys.DomainParameters(dp)
spki = {}
spki['algorithm'] = keys.PublicKeyAlgorithm(pka)
spki['public_key'] = self.diffie.get_public_key()
authpack = {}
authpack['pkAuthenticator'] = PKAuthenticator(authenticator)
authpack['clientPublicValue'] = keys.PublicKeyInfo(spki)
authpack['clientDHNonce'] = self.diffie.dh_nonce
authpack = AuthPack(authpack)
signed_authpack = self.sign_authpack(authpack.dump(), wrap_signed = True)
payload = PA_PK_AS_REQ()
payload['signedAuthPack'] = signed_authpack
pa_data_1 = {}
pa_data_1['padata-type'] = PaDataType.PK_AS_REQ.value
pa_data_1['padata-value'] = payload.dump()
pa_data_0 = {}
pa_data_0['padata-type'] = int(PADATA_TYPE('PA-PAC-REQUEST'))
pa_data_0['padata-value'] = PA_PAC_REQUEST({'include-pac': True}).dump()
asreq = {}
asreq['pvno'] = 5
asreq['msg-type'] = 10
asreq['padata'] = [pa_data_0, pa_data_1]
asreq['req-body'] = kdc_req_body
return AS_REQ(asreq).dump()
def sign_authpack_native(self, data, wrap_signed = False):
"""
Creating PKCS7 blob which contains the following things:
1. 'data' blob which is an ASN1 encoded "AuthPack" structure
2. the certificate used to sign the data blob
3. the singed 'signed_attrs' structure (ASN1) which points to the "data" structure (in point 1)
"""
da = {}
da['algorithm'] = algos.DigestAlgorithmId('1.3.14.3.2.26') # for sha1
si = {}
si['version'] = 'v1'
si['sid'] = cms.IssuerAndSerialNumber({
'issuer': self.certificate.issuer,
'serial_number': self.certificate.serial_number,
})
si['digest_algorithm'] = algos.DigestAlgorithm(da)
si['signed_attrs'] = [
cms.CMSAttribute({'type': 'content_type', 'values': ['1.3.6.1.5.2.3.1']}), # indicates that the encap_content_info's authdata struct (marked with OID '1.3.6.1.5.2.3.1' is signed )
cms.CMSAttribute({'type': 'message_digest', 'values': [hashlib.sha1(data).digest()]}), ### hash of the data, the data itself will not be signed, but this block of data will be.
]
si['signature_algorithm'] = algos.SignedDigestAlgorithm({'algorithm' : '1.2.840.113549.1.1.1'})
si['signature'] = rsa_pkcs1v15_sign(self.privkey, cms.CMSAttributes(si['signed_attrs']).dump(), "sha1")
ec = {}
ec['content_type'] = '1.3.6.1.5.2.3.1'
ec['content'] = data
sd = {}
sd['version'] = 'v3'
sd['digest_algorithms'] = [algos.DigestAlgorithm(da)] # must have only one
sd['encap_content_info'] = cms.EncapsulatedContentInfo(ec)
sd['certificates'] = [self.certificate]
sd['signer_infos'] = cms.SignerInfos([cms.SignerInfo(si)])
if wrap_signed is True:
ci = {}
ci['content_type'] = '1.2.840.113549.1.7.2' # signed data OID
ci['content'] = cms.SignedData(sd)
return cms.ContentInfo(ci).dump()
return cms.SignedData(sd).dump()
def decrypt_asrep(self, as_rep):
def truncate_key(value, keysize):
output = b''
currentNum = 0
while len(output) < keysize:
currentDigest = hashlib.sha1(bytes([currentNum]) + value).digest()
if len(output) + len(currentDigest) > keysize:
output += currentDigest[:keysize - len(output)]
break
output += currentDigest
currentNum += 1
return output
for pa in as_rep['padata']:
if pa['padata-type'] == 17:
pkasrep = PA_PK_AS_REP.load(pa['padata-value']).native
break
else:
raise Exception('PA_PK_AS_REP not found!')
ci = cms.ContentInfo.load(pkasrep['dhSignedData']).native
sd = ci['content']
keyinfo = sd['encap_content_info']
if keyinfo['content_type'] != '1.3.6.1.5.2.3.2':
raise Exception('Keyinfo content type unexpected value')
authdata = KDCDHKeyInfo.load(keyinfo['content']).native
pubkey = int(''.join(['1'] + [str(x) for x in authdata['subjectPublicKey']]), 2)
pubkey = int.from_bytes(core.BitString(authdata['subjectPublicKey']).dump()[7:], 'big', signed = False)
shared_key = self.diffie.exchange(pubkey)
server_nonce = pkasrep['serverDHNonce']
fullKey = shared_key + self.diffie.dh_nonce + server_nonce
etype = as_rep['enc-part']['etype']
cipher = _enctype_table[etype]
if etype == Enctype.AES256:
t_key = truncate_key(fullKey, 32)
elif etype == Enctype.AES128:
t_key = truncate_key(fullKey, 16)
elif etype == Enctype.RC4:
raise NotImplementedError('RC4 key truncation documentation missing. it is different from AES')
#t_key = truncate_key(fullKey, 16)
key = Key(cipher.enctype, t_key)
enc_data = as_rep['enc-part']['cipher']
logger.info('AS-REP encryption key (you might need this later):')
logger.info(binascii.hexlify(t_key).decode('utf-8'))
dec_data = cipher.decrypt(key, 3, enc_data)
encasrep = EncASRepPart.load(dec_data).native
cipher = _enctype_table[ int(encasrep['key']['keytype'])]
session_key = Key(cipher.enctype, encasrep['key']['keyvalue'])
return encasrep, session_key, cipher
def amain(args):
# Static DH params because the ones generated by cryptography are considered unsafe by AD for some weird reason
dhparams = {
'p':int('00ffffffffffffffffc90fdaa22168c234c4c6628b80dc1cd129024e088a67cc74020bbea63b139b22514a08798e3404ddef9519b3cd3a431b302b0a6df25f14374fe1356d6d51c245e485b576625e7ec6f44c42e9a637ed6b0bff5cb6f406b7edee386bfb5a899fa5ae9f24117c4b1fe649286651ece65381ffffffffffffffff', 16),
'g':2
}
logger.info('Loading certificate and key from file')
if args.pfx_base64:
pfxdata = base64.b64decode(args.pfx_base64)
ini = myPKINIT.from_pfx_data(pfxdata, args.pfx_pass, dhparams)
elif args.cert_pfx:
ini = myPKINIT.from_pfx(args.cert_pfx, args.pfx_pass, dhparams)
elif args.cert_pem and args.key_pem:
ini = myPKINIT.from_pem(args.cert_pem, args.key_pem, dhparams)
else:
logging.error('You must either specify a PFX file + optional password or a combination of Cert PEM file and Private key PEM file')
return
domain, username = args.identity.split('/')
req = ini.build_asreq(domain,username)
logger.info('Requesting TGT')
if not args.dc_ip:
args.dc_ip = domain
sock = KerberosClientSocket(KerberosTarget(args.dc_ip))
res = sock.sendrecv(req)
encasrep, session_key, cipher = ini.decrypt_asrep(res.native)
ccache = CCACHE()
ccache.add_tgt(res.native, encasrep)
ccache.to_file(args.ccache)
logger.info('Saved TGT to file')
def main():
import argparse
parser = argparse.ArgumentParser(description='Requests a TGT using Kerberos PKINIT and either a PEM or PFX based certificate+key')
parser.add_argument('identity', action='store', metavar='domain/username', help='Domain and username in the cert')
parser.add_argument('ccache', help='ccache file to store the TGT in')
parser.add_argument('-cert-pfx', action='store', metavar='file', help='PFX file')
parser.add_argument('-pfx-pass', action='store', metavar='password', help='PFX file password')
parser.add_argument('-pfx-base64', action='store', metavar='BASE64', help='PFX file as base64 string')
parser.add_argument('-cert-pem', action='store', metavar='file', help='Certificate in PEM format')
parser.add_argument('-key-pem', action='store', metavar='file', help='Private key file in PEM format')
parser.add_argument('-dc-ip', help='DC IP or hostname to use as KDC')
parser.add_argument('-v', '--verbose', action='count', default=0)
args = parser.parse_args()
if args.verbose == 0:
logger.setLevel(logging.INFO)
elif args.verbose == 1:
logger.setLevel(logging.INFO)
else:
logger.setLevel(1)
amain(args)
if __name__ == '__main__':
main()