-
Notifications
You must be signed in to change notification settings - Fork 2
/
test.py
executable file
·1811 lines (1488 loc) · 64.2 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""Cryptopals tests"""
import gc
import io
import hmac
import time
import math
import json
import base64
import hashlib
import unittest
from unittest import mock
from functools import cache
from Crypto.Hash import MD4
from Crypto.Cipher import AES
from Crypto.Util import Counter
from Crypto.Random import get_random_bytes
from Crypto.Random.random import randrange, getrandbits
import m01
import m02
import m03
import m04
import m05
import m06
import m08
import m09
import m10
import m11
import m12
import m13
import m14
import m15
import m16
import m17
import m18
import m21
import m22
import m23
import m24
import m25
import m26
import m27
import m28
import m29
import m30
import m31
import m32
import m33
import m34
import m35
import m36
import m37
import m38
import m39
import m40
import m41
import m42
import m43
import m44
import m45
import m46
import m47
import m49
import m50
import m51
import m52
import m53
import m54
KEY = b"YELLOW SUBMARINE"
IV = bytes(len(KEY))
MESSAGE = b"Attack at dawn"
BLOCKSIZE = 16
PRIME = 197
G = 2
class Test01(unittest.TestCase):
"""Convert hex to base64"""
def test_hex_to_base64_sanity(self) -> None:
"""hex_to_base64 should pass 0 --> 0"""
self.assertEqual(m01.hex_to_base64("00"), "AA==")
class Test02(unittest.TestCase):
"""Fixed XOR"""
def test_fixed_xor_sanity(self) -> None:
"""a xor a = 0, a xor 0 = a"""
for i in range(256):
self.assertEqual(m02.fixed_xor(bytes([i]), bytes([i])), b"\x00")
self.assertEqual(m02.fixed_xor(bytes([i]), b"\x00"), bytes([i]))
def test_fixed_xor_out_of_range(self) -> None:
"""fixed_xor(x) should fail when x !in [0, 255]"""
self.assertRaises(ValueError,
lambda: m02.fixed_xor(bytes([256]), b"\x00"))
def test_fixed_xor_different_lengths(self) -> None:
"""xor a and b of differing lengths"""
with self.assertRaises(ValueError):
m02.fixed_xor(b"0", b"01")
class Test03(unittest.TestCase):
"""Single-byte XOR cipher"""
def test_xor_everything_sanity(self) -> None:
"""xor_everything on 0x00 produces [0x00, ..., 0xff]"""
x = m03.xor_everything(b"\x00")
self.assertEqual(len(x), 256)
for i in range(256):
self.assertEqual(x[i], bytes([i]))
def test_score_empty_input(self) -> None:
"""score zero for zero-length input"""
self.assertEqual(m03.score(b""), 0)
class Test04(unittest.TestCase):
"""Detect single-character XOR"""
def test_find_xored_string(self) -> None:
"""find_xored_string"""
with open("data/04.txt", "r") as f:
data = [bytes.fromhex(line) for line in f.read().splitlines()]
xored_message = b"Now that the party is jumping\n"
self.assertEqual(m04.find_xored_message(data), xored_message)
class Test05(unittest.TestCase):
"""Implement repeating-key XOR"""
def test_repeating_key_xor(self) -> None:
"""repeating_key_xor"""
with open("data/05.txt", "r") as f:
message = bytes(f.read().rstrip(), "ascii")
key = b"ICE"
c = m05.repeating_key_xor(key, message)
self.assertEqual(c.hex(), "0b3637272a2b2e63622c2e69692a23693a2a3"
"c6324202d623d63343c2a2622632427276527"
"2a282b2f20430a652e2c652a3124333a653e2"
"b2027630c692b20283165286326302e27282f")
class Test06(unittest.TestCase):
"""Break repeating-key XOR"""
def test_hamming_distance(self) -> None:
"""Hamming distance matches known value"""
s1, s2 = b"this is a test", b"wokka wokka!!!"
self.assertEqual(m06.hamming_distance(s1, s2), 37)
self.assertEqual(m06.hamming_distance(s2, s2), 0)
def test_hamming_distance_wrong_size(self) -> None:
"""Hamming distance raises on different sizes"""
s1 = b"different"
s2 = b"sizes"
with self.assertRaises(ValueError):
m06.hamming_distance(s1, s2)
def test_key(self) -> None:
"""Generate key"""
with open("data/06.txt", "r") as f:
cyphertext = base64.b64decode(f.read())
self.assertEqual(m06.key(cyphertext), b"Terminator X: Bring the noise")
def test_break_repeating_key_xor(self) -> None:
"""Break repeating key xor"""
with open("data/06.txt", "r") as f:
cyphertext = base64.b64decode(f.read())
self.assertEqual(m06.break_repeating_key_xor(cyphertext)[:33],
b"I'm back and I'm ringin' the bell")
class Test08(unittest.TestCase):
"""Detect AES in ECB mode"""
def test_ecb_score(self) -> None:
"""Trivial values for ecb_score"""
self.assertEqual(m08.ecb_score(bytes(BLOCKSIZE), BLOCKSIZE), 0)
self.assertEqual(m08.ecb_score(bytes(2 * BLOCKSIZE), BLOCKSIZE), 1)
def test_detct_ecb(self) -> None:
"""Detect ECB"""
with open("data/08.txt", "r") as f:
g = [bytes.fromhex(cyphertext) for cyphertext in f.read().splitlines()]
ecb_encrypted = m08.detect_ecb(g, BLOCKSIZE)
self.assertIsNot(ecb_encrypted, None)
index = g.index(ecb_encrypted)
self.assertEqual(index, 132)
class Test09(unittest.TestCase):
"""Implement PKCS#7 padding"""
def test_de_pkcs7_inverts_pkcs7(self) -> None:
"""de_pkcs7 inverts pkcs7"""
message = m09.de_pkcs7(m09.pkcs7(MESSAGE))
self.assertEqual(message, MESSAGE)
def test_zero_padding(self) -> None:
"""PKCS7 with zero padding"""
zero_pad_message = b"YELLOW SUBMARINE"
message = m09.de_pkcs7(m09.pkcs7(zero_pad_message))
self.assertEqual(message, zero_pad_message)
def test_pad_big_block(self) -> None:
"""PKCS7 with blocksize too large"""
blocksize = 256 + len(MESSAGE)
with self.assertRaises(m09.PKCS7PaddingError):
m09.pkcs7(MESSAGE, blocksize)
class Test10(unittest.TestCase):
"""Implement CBC mode"""
def test_aes_cbc_encrypt(self) -> None:
"""encrypt_aes_cbc matches Crypto.Cipher.AES"""
cypher = AES.new(KEY, AES.MODE_CBC, IV)
crypto_cyphertext = cypher.encrypt(m09.pkcs7(MESSAGE))
cyphertext = m10.encrypt_aes_cbc(KEY, IV, m09.pkcs7(MESSAGE))
self.assertEqual(cyphertext, crypto_cyphertext)
def test_aes_cbc_decrypt(self) -> None:
"""decrypt_aes_cbc matches Crypto.Cipher.AES"""
cyphertext = b"x\x9b\xdb\xf8\x93\xae[x\x9a%\xb7\xffT\x1fc\xd5"
cypher = AES.new(KEY, AES.MODE_CBC, IV)
crypto_plaintext = cypher.decrypt(cyphertext)
plaintext = m10.decrypt_aes_cbc(KEY, IV, cyphertext)
self.assertEqual(plaintext, crypto_plaintext)
def test_aes_cbc_symmetry(self) -> None:
"""decrypt_aes_cbc inverts encrypt_aes_cbc"""
m = m09.pkcs7(MESSAGE)
cyphertext = m10.encrypt_aes_cbc(KEY, IV, m)
plaintext = m10.decrypt_aes_cbc(KEY, IV, cyphertext)
self.assertEqual(m, plaintext)
class Test11(unittest.TestCase):
"""An ECB/CBC detection oracle"""
def test_detect_ecb(self) -> None:
"""Detect ECB"""
for _ in range(10):
cyphertext = m11.encryption_oracle(MESSAGE * 3)
self.assertIn(m11.detect_ecb(cyphertext), [True, False])
class Test12(unittest.TestCase):
"""Byte-at-a-time ECB decryption (Simple)"""
def test_blocksize(self) -> None:
"""Detect blocksize"""
self.assertEqual(m12.blocksize(m12.oracle), 16)
def test_len_string(self) -> None:
"""Detect string length"""
self.assertEqual(m12.len_string(m12.oracle), 138)
def test_break_ecb(self) -> None:
"""Break ECB"""
self.assertEqual(m12.break_ecb(m12.oracle).split(b"\n")[0],
b"Rollin' in my 5.0")
class Test13(unittest.TestCase):
"""ECB cut-and-paste"""
def test_ecb_cut_and_paste_end_to_end(self) -> None:
"""Forge cookie with ECB cut-and-paste"""
email = "[email protected]"
admin_cookie = m13.rewrite_cookie(email)
profile = m13.decrypt_oracle(admin_cookie)
self.assertEqual(profile["email"], email)
self.assertEqual(profile["role"], "admin")
class Test14(unittest.TestCase):
"""Byte-at-a-time ECB decryption (Harder)"""
def test_byte_at_a_time_ecb_decryption_attack(self) -> None:
"""End-to-end byte-at-a-time ECB decryption (harder)"""
self.assertTrue(m14.break_ecb(m14.oracle))
class Test15(unittest.TestCase):
"""PKCS#7 padding validation"""
def test_equal_pkcs7_padding(self) -> None:
"""pkcs7 pads correctly"""
message = b"ICE ICE BABY"
self.assertEqual(m09.pkcs7(message), m15.pkcs7(message))
def test_valid_pkcs7_padding(self) -> None:
"""de_pkcs7 validates for correct padding"""
s = b"ICE ICE BABY"
s_pad = m09.pkcs7(s)
self.assertEqual(s, m15.de_pkcs7(s_pad))
def test_invalid_pkcs7_padding(self) -> None:
"""de_pkcs7 throws for incorrect padding"""
s_pad = b"ICE ICE BABY\x01\x02\x03\x04"
self.assertRaises(m09.PKCS7PaddingError, m15.de_pkcs7, s_pad)
s_pad = b"ICE ICE BABY\x05\x05\x05\x05"
self.assertRaises(m09.PKCS7PaddingError, m15.de_pkcs7, s_pad)
class Test16(unittest.TestCase):
"""CBC bitflipping attacks"""
def test_cbc_bitflipping_attack(self) -> None:
"""End-to-end CBC bitflipping attack"""
plaintext = bytes(16) + b"00000:admin<true"
cyphertext = m16.oracle(plaintext)
cyphertext = m16.cbc_bitflip(cyphertext)
self.assertTrue(m16.is_admin(cyphertext))
class Test17(unittest.TestCase):
"""The CBC padding oracle"""
@mock.patch("sys.stdout", _=io.StringIO)
def test_cbc_padding_oracle_attack(self, _: io.StringIO) -> None:
"""End-to-end attack on the CBC padding oracle"""
c = m17.cbc_oracle()
m = m17.attack(c)
self.assertTrue(m)
class Test18(unittest.TestCase):
"""Implement CTR, the stream cipher mode"""
def test_aes_ctr(self) -> None:
"""aes_ctr matches Crypto.Cipher.AES"""
ctr = Counter.new(128, initial_value=0)
cypher = AES.new(KEY, mode=AES.MODE_CTR, counter=ctr)
crypto_cyphertext = cypher.encrypt(MESSAGE)
cyphertext = m18.aes_ctr(MESSAGE, KEY)
self.assertEqual(cyphertext, crypto_cyphertext)
def test_aes_ctr_symmetry(self) -> None:
"""aes_ctr is symmetric"""
cyphertext = m18.aes_ctr(MESSAGE, KEY)
plaintext = m18.aes_ctr(cyphertext, KEY)
self.assertEqual(plaintext, MESSAGE)
class Test21(unittest.TestCase):
"""Implement the MT19937 Mersenne Twister RNG"""
def test_mersenne_twister(self) -> None:
"""MT19937 returns a known good value"""
# https://oeis.org/A221557
mt = m21.MT19937(5489)
self.assertEqual(mt.random(), 3499211612)
class Test22(unittest.TestCase):
"""Crack an MT19937 seed"""
@unittest.skip("Extremely long test")
def test_crack_mt19937_seed(self) -> None:
"""Crack an MT19937 seed, end-to-end"""
r = m22.sleep_mersenne()
seed = m22.crack_seed(r)
clone = m21.MT19937(seed)
self.assertEqual(clone.random(), r)
def test_crack_artificial_mt19937_seed(self) -> None:
"""Crack an artificial MT19937 seed"""
seed = int(time.time()) - 1
r = m21.MT19937(seed).random()
seed_candidate = m22.crack_seed(r)
self.assertEqual(seed, seed_candidate)
class Test23(unittest.TestCase):
"""Clone an MT19937 RNG from its output"""
def test_clone_mt(self) -> None:
"""mt_clone clones an MT19937 instance"""
mt = m21.MT19937(5489)
mt_clone = m23.clone_mt(mt)
self.assertEqual(mt.random(), mt_clone.random())
class Test24(unittest.TestCase):
"""Create the MT19937 stream cipher and break it"""
def test_verify_mt19937_crypt(self) -> None:
"""Verify MT19937 steam cipher encryption"""
self.assertTrue(m24.verify_mt19937_crypt(bytes(10), 0xffff))
@unittest.skip("Long test")
def test_crack_mt19937_crypt(self) -> None:
"""Get MT19937 stream cipher seed"""
seed = getrandbits(16)
prefix = get_random_bytes(randrange(100))
plaintext = prefix + b"A" * 14
cyphertext = m24.mt19937_crypt(plaintext, seed)
found_seed = m24.crack_mt19937(cyphertext)
self.assertEqual(found_seed, seed)
def test_crack_mt19937_crypt_contrived(self) -> None:
"""Given contrived cyphertext, crack MT19937 stream cypher seed"""
cyphertext = b"d\xaa\xcd\t"
seed = m24.crack_mt19937(cyphertext)
self.assertEqual(seed, 1)
class Test25(unittest.TestCase):
"""Break "random access read/write" AES CTR"""
def test_aes_ctr_rarw_attack(self) -> None:
"""AES-CTR random access read/write attack"""
c = m18.aes_ctr(MESSAGE, m25.RANDOM_KEY)
m_prime = m25.break_rarw(c)
self.assertEqual(MESSAGE, m_prime)
class Test26(unittest.TestCase):
"""CTR bitflipping"""
def test_ctr_bitflipping(self) -> None:
"""CTR bitflipping attack"""
plaintext = bytes(5) + b":admin<true"
cyphertext = m26.oracle(plaintext)
cyphertext = m26.ctr_bitflip(cyphertext)
self.assertTrue(m26.is_admin(cyphertext))
class Test27(unittest.TestCase):
"""Recover the key from CBC with IV = Key"""
def test_ascii_compliant(self) -> None:
"""Pass ASCII compliance check"""
self.assertTrue(m27.ascii_compliant(b"abc"))
def test_oracle_plaintext(self) -> None:
"""Query the oracle with ASCII-compliant plaintext"""
c = m27.bad_cbc_encryption(MESSAGE)
self.assertIs(m27.oracle(c), None)
def test_recover_cbc_key_with_equal_key_iv(self) -> None:
"""CBC key recovery with IV = key"""
c = m27.bad_cbc_encryption(MESSAGE * 3)
k = m27.cbc_iv_key(c)
self.assertEqual(k, m27.RANDOM_KEY)
class Test28(unittest.TestCase):
"""Implement a SHA-1 keyed MAC"""
def test_sha1(self) -> None:
"""SHA1 matches hashlib.sha1"""
m = b"digest me" * 512
h = hashlib.sha1(m).hexdigest()
h_prime = m28.SHA1(m).hexdigest()
self.assertEqual(h, h_prime)
def test_sha1_pep_452(self) -> None:
"""SHA1 conforms to PEP 452"""
h = hashlib.sha1()
h_prime = m28.SHA1()
self.assertEqual(h.digest_size, h_prime.digest_size)
self.assertEqual(h.block_size, h_prime.block_size)
self.assertEqual(h.name, h_prime.name)
def test_sha1_empty_message(self) -> None:
"""SHA1 of empty message matches empty hash"""
m = b""
hs = set()
hs.add(m28.SHA1().hexdigest())
hs.add(m28.SHA1(m).hexdigest())
h = m28.SHA1()
h.update(m)
hs.add(h.hexdigest())
h = m28.SHA1(m)
h.update(m)
hs.add(h.hexdigest())
hs.add(hashlib.sha1(m).hexdigest())
self.assertEqual(hs, set(["da39a3ee5e6b4b0d3255bfef95601890afd80709"]))
def test_sha1_long_input(self) -> None:
"""SHA1 of variable message length matches hashlib.sha1"""
for i in range(513):
m = bytes(i)
h = hashlib.sha1(m).hexdigest()
h_prime = m28.SHA1(m).hexdigest()
self.assertEqual(h, h_prime)
def test_sha1_mac(self) -> None:
"""sha1_mac matches hashlib.sha1"""
m = b"digest me" * 512
k = b"it is authentic"
h = hashlib.sha1(k + m).hexdigest()
h_prime = m28.SHA1(k + m).hexdigest()
self.assertEqual(h, h_prime)
def test_sha1_mac_empty_message(self) -> None:
"""sha1_mac with empty message matches hashlib.sha1"""
m = b""
k = b"it is authentic"
h = hashlib.sha1(k + m).hexdigest()
h_prime = m28.SHA1(k + m).hexdigest()
self.assertEqual(h, h_prime)
def test_sha1_mac_empty_key(self) -> None:
"""sha1_mac with empty key matches hashlib.sha1"""
m = b"not very authenticated"
k = b""
h = hashlib.sha1(k + m).hexdigest()
h_prime = m28.SHA1(k + m).hexdigest()
self.assertEqual(h, h_prime)
def test_sha1_mac_empty_message_and_key(self) -> None:
"""sha1_mac with empty message and key matches hashlib.sha1"""
m, k = b"", b""
h = hashlib.sha1(k + m).hexdigest()
h_prime = m28.SHA1(k + m).hexdigest()
self.assertEqual(h, h_prime)
def test_sha1_mac_cascade(self) -> None:
"""sha1_mac cannot be tampered with"""
m = b"digest me"
m_prime = b"digest he"
k = b"it is authentic"
self.assertNotEqual(m28.sha1_mac(m, k), m28.sha1_mac(m_prime, k))
def test_sha1_mac_different_key(self) -> None:
"""sha1_mac cannot authenticate with different key"""
m = b"digest me"
k = b"it is authentic"
k_prime = b"inauthentic"
self.assertNotEqual(m28.sha1_mac(m, k), m28.sha1_mac(m, k_prime))
def test_sha1_update(self) -> None:
"""SHA1 updates correctly"""
m1 = b"A" * 512
m2 = b"B" * 512
h = m28.SHA1()
h.update(m1)
h.update(m2)
h_combined = m28.SHA1(m1 + m2)
self.assertEqual(h.hexdigest(), h_combined.hexdigest())
self.assertEqual(h.hexdigest(), hashlib.sha1(m1 + m2).hexdigest())
def test_sha1_initialisation(self) -> None:
"""SHA1 initialises correctly"""
m = b"could be anything"
h1 = m28.SHA1(m)
h2 = m28.SHA1()
h2.update(m)
self.assertEqual(h1.hexdigest(), h2.hexdigest())
def test_copy(self) -> None:
"""Copy SHA1 object"""
h = m28.SHA1(MESSAGE)
h_prime = h.copy()
self.assertEqual(h.digest(), h_prime.digest())
class Test29(unittest.TestCase):
"""Break a SHA-1 keyed MAC using length extension"""
def test_get_sha1_state(self) -> None:
"""Get the internal state of a SHA-1 object"""
h = m28.SHA1()
self.assertEqual(m29.sha1_state_from_binary(h.digest()),
m29.sha1_state_from_object(h))
def test_break_sha1_mac_length_extension(self) -> None:
"""Break a SHA-1 keyed MAC using length extension"""
m = b"comment1=cooking%20MCs;userdata=foo;" \
b"comment2=%20like%20a%20pound%20of%20bacon"
k = get_random_bytes(randrange(50))
z = b";admin=true"
# server-side
d = m28.sha1_mac(m, k)
m_prime = m + bytearray(m29.md_padding(k + m)) + z
# client-side
for q in m29.extend_sha1(d, z):
if m29.verify_sha1_mac(q, m_prime, k):
return
self.fail("No extended HMAC validated")
class Test30(unittest.TestCase):
"""Break an MD4 keyed MAC using length extension"""
def test_md4_test_vectors(self) -> None:
"""MD4 digests test vectors correctly (RFC1320)"""
v = b""
self.assertEqual(m30.MD4(v).hexdigest(), "31d6cfe0d16ae931b73c59d7e0c089c0")
v = b"a"
self.assertEqual(m30.MD4(v).hexdigest(), "bde52cb31de33e46245e05fbdbd6fb24")
v = b"abc"
self.assertEqual(m30.MD4(v).hexdigest(), "a448017aaf21d8525fc10ae87aa6729d")
v = b"message digest"
self.assertEqual(m30.MD4(v).hexdigest(), "d9130a8164549fe818874806e1c7014b")
v = b"abcdefghijklmnopqrstuvwxyz"
self.assertEqual(m30.MD4(v).hexdigest(), "d79e1c308aa5bbcdeea8ed63df412da9")
v = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
self.assertEqual(m30.MD4(v).hexdigest(), "043f8582f241db351ce627e153e7f0e4")
v = b"1234567890123456789012345678901234567890" \
b"1234567890123456789012345678901234567890"
self.assertEqual(m30.MD4(v).hexdigest(), "e33b4ddc9c38f2199c3e7b164fcc0536")
def test_md4_long_input(self) -> None:
"""MD4 of variable message length matches Crypto.Hash.MD4"""
for i in range(513):
m = bytes(i)
h = m30.MD4(m)
h_prime = MD4.new(m)
self.assertEqual(h.digest(), h_prime.digest())
def test_md4_initialisation(self) -> None:
"""MD4 initialises correctly"""
m = b"could be anything"
h1 = m30.MD4(m)
h2 = m30.MD4()
h2.update(m)
self.assertEqual(h1.hexdigest(), h2.hexdigest())
def test_md4_update(self) -> None:
"""MD4 updates correctly"""
m1 = b"A" * 512
m2 = b"B" * 512
h = m30.MD4()
h.update(m1)
h.update(m2)
h_combined = m30.MD4(m1 + m2)
self.assertEqual(h.hexdigest(), h_combined.hexdigest())
def test_md4_mac(self) -> None:
"""md4_mac matches Crypto.Hash.MD4"""
m = b"digest me" * 512
k = b"it is authentic"
h = MD4.new(k + m).hexdigest()
h_prime = m30.MD4(k + m).hexdigest()
self.assertEqual(h, h_prime)
def test_md4_mac_empty_message(self) -> None:
"""md4_mac with empty message matches Crypto.Hash.MD4"""
m = b""
k = b"it is authentic"
h = MD4.new(k + m).hexdigest()
h_prime = m30.MD4(k + m).hexdigest()
self.assertEqual(h, h_prime)
def test_md4_mac_empty_key(self) -> None:
"""md4_mac with empty key matches Crypto.Hash.MD4"""
m = b"not very authenticated"
k = b""
h = MD4.new(k + m).hexdigest()
h_prime = m30.MD4(k + m).hexdigest()
self.assertEqual(h, h_prime)
def test_md4_mac_empty_message_and_key(self) -> None:
"""md4_mac with empty message and key matches Crypto.Hash.MD4"""
m, k = b"", b""
h = MD4.new(k + m).hexdigest()
h_prime = m30.MD4(k + m).hexdigest()
self.assertEqual(h, h_prime)
def test_md4_mac_cascade(self) -> None:
"""md4_mac cannot be tampered with"""
m = b"digest me"
m_prime = b"digest he"
k = b"it is authentic"
self.assertNotEqual(m30.md4_mac(m, k), m30.md4_mac(m_prime, k))
def test_md4_mac_different_key(self) -> None:
"""md4_mac cannot authenticate with different key"""
m = b"digest me"
k = b"it is authentic"
k_prime = b"inauthentic"
self.assertNotEqual(m30.md4_mac(m, k), m30.md4_mac(m, k_prime))
def test_md4_copy(self) -> None:
"""Copy MD4 object"""
h = m30.MD4(MESSAGE)
h_prime = h.copy()
self.assertEqual(h.digest(), h_prime.digest())
def test_md4_state_from_hex(self) -> None:
"""Extract MD4 register from hex digest"""
h = m30.MD4(b"fud for thought")
register = m30.md4_state_from_hex(h.hexdigest())
self.assertEqual(register, m30.md4_state_from_object(h))
def test_md4_state_from_binary(self) -> None:
"""Extract MD4 register from binary digest"""
h = m30.MD4(b"fud for thought")
register = m30.md4_state_from_binary(h.digest())
self.assertEqual(register, m30.md4_state_from_object(h))
def test_verify_md4_mac(self) -> None:
"""Verify MD4 MAC"""
mac = m30.md4_mac(MESSAGE, KEY)
self.assertTrue(m30.verify_md4_mac(mac, MESSAGE, KEY))
def test_md_padding(self) -> None:
"""MD padding"""
data = b"llllll"
padding = m30.md_padding(data)
padding_prime = b"\x80" \
+ bytes((56 - len(data) - 1 % m30.MD4.block_size)
% m30.MD4.block_size) \
+ (8 * len(data)).to_bytes(8, "little")
self.assertEqual(padding, padding_prime)
def test_extend_md4(self) -> None:
"""Extended MD4 MAC forgery"""
m = MESSAGE
k = KEY
z = b"No, dusk!"
mac = m30.md4_mac(m, k)
m_prime = m + bytearray(m30.md_padding(k + m)) + z
verification = [m30.verify_md4_mac(q, m_prime, k)
for q in m30.extend_md4(mac, z)]
self.assertIn(True, verification)
class Test31(unittest.TestCase):
"""Implement and break HMAC-SHA1 with an artificial timing leak"""
def test_hmac_sha1(self) -> None:
"""hmac_sha1 matches hmac.new(k, m, sha1)"""
k = KEY
m = MESSAGE
h = m31.hmac_sha1(k, m)
h_prime = hmac.new(k, m, hashlib.sha1)
self.assertEqual(h.digest(), h_prime.digest())
def test_hmac_sha1_large_key(self) -> None:
"""hmac_sha1 with large key"""
k = 128 * KEY
m = MESSAGE
h = m31.hmac_sha1(k, m)
h_prime = hmac.new(k, m, hashlib.sha1)
self.assertEqual(h.digest(), h_prime.digest())
def test_hmac_sha1_equal_sized_key(self) -> None:
"""hmac_sha1 with large key"""
k = bytes(64)
m = MESSAGE
h = m31.hmac_sha1(k, m)
h_prime = hmac.new(k, m, hashlib.sha1)
self.assertEqual(h.digest(), h_prime.digest())
def test_insecure_compare_identical(self) -> None:
"""insecure_compare identifies identical bytes"""
a = b"identical bytes"
self.assertTrue(m31.insecure_compare(a, a, 0))
def test_insecure_compare_different(self) -> None:
"""insecure_compare distinguishes different bytes"""
a = b"different bytes"
b = b"different bytez"
self.assertFalse(m31.insecure_compare(a, b, 0))
def test_insecure_compare_different_size(self) -> None:
"""insecure_compare raises for different sizes"""
a = b"different"
b = b"sizes"
with self.assertRaises(ValueError):
m31.insecure_compare(a, b, 0)
def test_verify_hmac_sha1_hex(self) -> None:
"""verify HMAC"""
k, m = KEY, MESSAGE
signature = m31.hmac_sha1(k, m).hexdigest().encode()
self.assertTrue(m31.verify_hmac_sha1_hex(m, signature, k, 0))
def test_instantiate_listener(self) -> None:
"""Instantiate listener"""
local_server = ("localhost", 9131)
listener = m31.HMACListener(local_server, KEY, delay=0.025)
self.assertTrue(listener)
listener.run()
listener.stop()
@unittest.skip("Extremely long test")
def test_hmac_sha1_attack(self) -> None:
"""End-to-end HMAC-SHA1 attack (artificial timing leak)"""
file_name = b"foo"
key = get_random_bytes(16)
local_server = ("localhost", 9131)
listener = m31.HMACListener(local_server, key, delay=0.025)
listener.run()
try:
hmac_attack = m31.HMACAttack(local_server)
hex_hmac = hmac_attack.get_sha1_hmac(file_name.decode())
self.assertEqual(hex_hmac, m31.hmac_sha1(key, file_name).hexdigest())
finally:
listener.stop()
class Test32(unittest.TestCase):
"""Break HMAC-SHA1 with a slightly less artificial timing leak"""
def test_instantiate_attack(self) -> None:
"""Instantiate the attack class"""
local_server = ("localhost", 9132)
hmac_attack = m32.HMACAttack(local_server, 10)
self.assertTrue(hmac_attack)
@unittest.skip("Extremely long test")
def test_hmac_sha1_attack(self) -> None:
"""End-to-end HMAC-SHA1 attack (realistic timing leak)"""
file_name = b"foo"
key = get_random_bytes(16)
local_server = ("localhost", 9132)
listener = m31.HMACListener(local_server, key, delay=0.005)
hmac_attack = m32.HMACAttack(local_server, 10)
listener.run()
try:
hex_hmac = hmac_attack.get_sha1_hmac(file_name.decode())
self.assertEqual(hex_hmac, m31.hmac_sha1(key, file_name).hexdigest())
finally:
listener.stop()
class Test33(unittest.TestCase):
"""Implement Diffie-Hellman"""
def test_dh_key_exchange(self) -> None:
"""Diffie-Hellman peers agree on shared secret"""
s_a, s_b = m33.dh_key_exchange(PRIME, G)
self.assertEqual(s_a, s_b)
class Test34(unittest.TestCase):
"""Implement a MITM key-fixing attack on Diffie-Hellman
with parameter injection"""
def test_dh_protocol(self) -> None:
"""Diffie-Hellman peer receives message"""
received_message = m34.dh_protocol(PRIME, G, MESSAGE)
self.assertEqual(received_message, MESSAGE)
def test_dh_parameter_injection(self) -> None:
"""Diffie-Hellman parameter injection intercepts message"""
intercepted = m34.dh_parameter_injection(PRIME, G, MESSAGE)
self.assertEqual(intercepted, MESSAGE)
class Test35(unittest.TestCase):
"""Implement DH with negotiated groups,
and break with malicious g parameters"""
def tearDown(self) -> None:
gc.collect()
def test_dh_protocol(self) -> None:
"""Diffie-Hellman TCP peer receives message"""
plaintext = m35.dh_protocol(PRIME, G, MESSAGE)
self.assertEqual(plaintext, MESSAGE)
def test_dh_malicious_g_is_1(self) -> None:
"""Diffie-Hellman inject malicious parameter g = 1"""
plaintext = m35.dh_malicious_g(PRIME, G, MESSAGE, 1)
self.assertEqual(plaintext, MESSAGE)
def test_dh_malicious_g_is_p(self) -> None:
"""Diffie-Hellman inject malicious parameter g = p"""
plaintext = m35.dh_malicious_g(PRIME, G, MESSAGE, PRIME)
self.assertEqual(plaintext, MESSAGE)
def test_dh_malicious_g_is_p_minus_1(self) -> None:
"""Diffie-Hellman inject malicious parameter g = p - 1"""
plaintext = m35.dh_malicious_g(PRIME, G, MESSAGE, PRIME - 1)
self.assertEqual(plaintext, MESSAGE)
class Test36(unittest.TestCase):
"""Implement Secure Remote Password (SRP)"""
def test_client_pubkey(self) -> None:
"""client-side public key"""
carol = m36.Client(PRIME, "[email protected]", "pw", 2, 3)
carol.pubkey()
self.assertEqual(carol.A, carol.pubkey())
def test_server_pubkey(self) -> None:
"""server-side public key"""
parameters: m36.Parameters = {"N": PRIME,
"g": 2,
"k": 3,
"I": "[email protected]",
"p": "password"
}
steve = m36.Server()
steve.negotiate_receive(parameters)
steve.verifier()
steve.pubkey()
self.assertEqual(steve.B, steve.pubkey())
def test_bad_email(self) -> None:
"""mismatched emails"""
parameters: m36.Parameters = {"N": PRIME,
"g": 2,
"k": 3,
"I": "[email protected]",
"p": "password"
}
steve = m36.Server()
steve.negotiate_receive(parameters)
bad_parameters: m36.EmailPubKey = {"I": "[email protected]", "pubkey": 123}
self.assertRaises(ValueError,
lambda: steve.receive_email_pubkey(bad_parameters))
def test_bad_hmac(self) -> None:
"""hmacs don't match"""
parameters: m36.Parameters = {"N": PRIME,
"g": 2,
"k": 3,
"I": "[email protected]",
"p": "password"
}
steve = m36.Server()
steve.negotiate_receive(parameters)
steve.scrambler()
steve.verifier()
steve.pubkey()
steve.A = 123
steve.gen_K()
response = steve.receive_hmac("deadbeef")
self.assertFalse(response)
def test_srp_protocol(self) -> None:
"""SRP protocol server verification"""
carol = m36.Client(m36.prime(), email="[email protected]",
password="submarines")
steve = m36.Server()
result = m36.srp_protocol(carol, steve)
self.assertTrue(result)
class Test37(unittest.TestCase):
"""Break SRP with a zero key"""
def test_srp_zero_key(self) -> None:
"""Break SRP with a zero key"""
carol = m37.CustomKeyClient(m36.prime(), email="[email protected]",
password="submarines")
steve = m36.Server()
carol.A = 0
result = m36.srp_protocol(carol, steve)
self.assertTrue(result)
def test_srp_multiple_prime_key(self) -> None:
"""Break SRP with a zero key"""
prime = m36.prime()
for i in range(1, 4):
with self.subTest(i=i):
carol = m37.CustomKeyClient(prime, email="[email protected]",
password="submarines")
steve = m36.Server()
carol.A = i * prime
result = m36.srp_protocol(carol, steve)
self.assertTrue(result)
class Test38(unittest.TestCase):
"""Offline dictionary attack on simplified SRP"""
def test_simple_srp(self) -> None:
"""Implement simple SRP"""
client = m38.SimpleClient(PRIME, G,
username="[email protected]",
password="yolo")
server = m38.SimpleServer(PRIME, G)
self.assertTrue(m38.simple_srp(client, server))
@unittest.skip("Requires a wordlist file")
def test_mitm_simple_srp(self) -> None:
"""Crack simple SRP (with wordlist file)"""
prime = m36.prime()
password = "aardvark"
client = m38.SimpleClient(prime, G,
username="[email protected]",
password=password)
evil_server = m38.EvilServer(prime, G)
candidate_password = m38.mitm_simple_srp(client, evil_server)
self.assertEqual(candidate_password, password)
@mock.patch.object(m38.EvilServer, "_words")
def test_mitm_simple_srp_no_io(self, _words: mock.Mock) -> None:
"""Crack simple SRP"""
prime = m36.prime()
password = "aardvark"
_words.return_value = [password]
client = m38.SimpleClient(prime, G,
username="[email protected]",
password=password)
evil_server = m38.EvilServer(prime, G)
candidate_password = m38.mitm_simple_srp(client, evil_server)
self.assertEqual(candidate_password, password)
class Test39(unittest.TestCase):
"""Implement RSA"""
def test_gcd(self) -> None:
"""Sanity check Euclidean GCD"""
interval = range(-10, 10)
for a in interval:
for b in interval:
self.assertEqual(m39.gcd(a, b), math.gcd(a, b))