From e9c40f90ac2d8c06597da3f7321b34e57f814509 Mon Sep 17 00:00:00 2001 From: Philip Axer Date: Mon, 9 Dec 2024 12:03:38 +0100 Subject: [PATCH] Added initial support for MACsec (#672) Signed-off-by: Philip Axer --- AUTHORS | 3 + dpkt/ethernet.py | 277 ++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 275 insertions(+), 5 deletions(-) diff --git a/AUTHORS b/AUTHORS index 23a498c3..cc763725 100644 --- a/AUTHORS +++ b/AUTHORS @@ -8,6 +8,9 @@ Dug Song Contributors ------------ +Philip Axer + MACsec support + Timur Alperovich radiotap module diff --git a/dpkt/ethernet.py b/dpkt/ethernet.py index 86cb1d08..932b2c90 100644 --- a/dpkt/ethernet.py +++ b/dpkt/ethernet.py @@ -15,6 +15,12 @@ from .utils import mac_to_str from .compat import compat_ord, iteritems, isstr +try: + from Crypto.Cipher import AES + crypto_support = True +except ImportError: + crypto_support = False + ETH_CRC_LEN = 4 ETH_HDR_LEN = 14 @@ -48,6 +54,7 @@ ETH_TYPE_LLDP = 0x88CC # Link Layer Discovery Protocol ETH_TYPE_TEB = 0x6558 # Transparent Ethernet Bridging ETH_TYPE_PROFINET = 0x8892 # PROFINET protocol +ETH_TYPE_MACSEC = 0x88E5 # MAC security # all QinQ types for fast checking _ETH_TYPES_QINQ = frozenset([ETH_TYPE_8021Q, ETH_TYPE_8021AD, ETH_TYPE_QINQ1, ETH_TYPE_QINQ2]) @@ -76,14 +83,19 @@ class Ethernet(dpkt.Packet): __pprint_funcs__ = { 'dst': mac_to_str, - 'src': mac_to_str, + 'src': mac_to_str } def __init__(self, *args, **kwargs): self._next_type = None + # We are setting these directly here. in case dpkt.Packet.__init__ is called with data as the first argument + # all the other arguments will be ignored + self.macsec_ciphersuite = kwargs.get('macsec_ciphersuite', None) + self.macsec_sak = kwargs.get('macsec_sak', None) + dpkt.Packet.__init__(self, *args, **kwargs) # if data was given in kwargs, try to unpack it - if self.data: + if kwargs.get('data', None): if isstr(self.data) or isinstance(self.data, bytes): self._unpack_data(self.data) @@ -128,14 +140,57 @@ def _unpack_data(self, buf): if buf[:2] == b'\x00\x00': # looks like the control word (ECW) buf = buf[4:] # skip the ECW self._next_type = ETH_TYPE_TEB # re-use TEB class mapping to decode Ethernet - + elif self._next_type == ETH_TYPE_MACSEC: + buf = self._unpack_macsec(buf) try: eth_type = self._next_type or self.type self.data = self._typesw[eth_type](buf) + setattr(self, self.data.__class__.__name__.lower(), self.data) except (KeyError, dpkt.UnpackError): self.data = buf + def _unpack_macsec(self, buf): + if self.macsec_ciphersuite is None: + raise dpkt.PackError("MACsec ciphersuite is not set, unable to decode MACsec frame") + + self.sec_tag = MACsec(buf) + offset = self.sec_tag.__hdr_len__ + if self.sec_tag.sc: + self.sec_tag.sci = buf[offset : offset + 8] + offset += 8 # SCI is 8 bytes long + + buf = buf[offset:] + + if self.sec_tag.sl == 0: + self.sec_tag.icv = buf[-self.macsec_ciphersuite.icv_len:] + buf = buf[:-self.macsec_ciphersuite.icv_len] + else: + if len(buf) < self.sec_tag.sl: + raise dpkt.PackError("Not enough data to unpack MACsec frame") + + self.sec_tag.icv = buf[self.sec_tag.sl : self.sec_tag.sl + self.macsec_ciphersuite.icv_len] + buf = buf[:self.sec_tag.sl] + + + if not crypto_support: + # in case we are not able to decrypt the frame, we just leave the data as is + self.data = buf + self._next_type = 0 + + if(self.sec_tag.e and self.sec_tag.c): + # Confientiality and Integrity + nonce = self.sec_tag.sci + struct.pack('>I', self.sec_tag.pn) + cipher = AES.new(self.macsec_sak, AES.MODE_GCM, mac_len = self.macsec_ciphersuite.icv_len, nonce = nonce) + buf = cipher.decrypt(buf) + + self._next_type = struct.unpack('>H', buf[0:2])[0] + return buf[2:] + else: + # Integrity + self._next_type = struct.unpack('>H', buf[0:2])[0] + return buf[2:] + def unpack(self, buf): dpkt.Packet.unpack(self, buf) if self.type > 1500: @@ -200,6 +255,7 @@ def unpack(self, buf): def pack_hdr(self): tags_buf = b'' + sec_tag_buf = b'' new_type = self.type # replacement self.type when packing eth header is_isl = False # ISL wraps Ethernet, this determines order of packing @@ -214,6 +270,17 @@ def pack_hdr(self): new_type = ETH_TYPE_MPLS tags_buf = b''.join(lbl.pack_hdr() for lbl in self.mpls_labels) + elif getattr(self, 'sec_tag', None): + last_tag_type = self.type # default + new_type = ETH_TYPE_MACSEC + if isinstance(self.data, dpkt.Packet): + last_tag_type = self._typesw_rev.get(self.data.__class__, self.type) + sec_tag = self.sec_tag + sec_tag.type = last_tag_type + if self.macsec_ciphersuite is None: + raise dpkt.PackError("MACsec ciphersuite is not set") + sec_tag_buf = sec_tag.pack_hdr() + elif getattr(self, 'vlan_tags', None): # set last tag type to next layer pointed by self.data last_tag_type = self.type # default @@ -253,7 +320,7 @@ def pack_hdr(self): hdr_buf = dpkt.Packet.pack_hdr(self)[:-2] + struct.pack('>H', new_type) if not is_isl: - return hdr_buf + tags_buf + return hdr_buf + sec_tag_buf + tags_buf else: return tags_buf + hdr_buf @@ -276,8 +343,48 @@ def __bytes__(self): fcs = struct.unpack('I', revcrc))[0] # bswap32 fcs = struct.pack('>I', fcs) tail = getattr(self, 'padding', b'') + fcs + getattr(self, 'trailer', b'') + + if getattr(self, 'sec_tag', None) is not None: + return self._bytes_macsec() + return bytes(dpkt.Packet.__bytes__(self) + tail) + def _bytes_macsec(self): + if not crypto_support: + # Leave the payload data in clear and add a dummy ICV of zeros + # This results in an invalid MACsec frame, but it's the best we can do without the crypto library + return bytes(self.pack_hdr() + bytes(self.data) + self.sec_tag.icv) + + hdr_data = self.pack_hdr() + + if getattr(self, 'macsec_sak', None) is None: + raise dpkt.PackError("Not able to pack MACsec frame. SAK not set") + + if(self.macsec_ciphersuite.blocksize / 8 != len(self.macsec_sak)): + raise dpkt.PackError("SAK length does not match the blocksize of the ciphersuite") + + if(isinstance(self.data, dpkt.Packet)): + # When the next layer is a dpkt.Packet, we take the ethertype from the sec_tag + data = struct.pack('>H', self.sec_tag.type) + bytes(self.data) + else: + # Othwerise, we asume the userdata is raw data and includes the ethertype already + data = bytes(self.data) + + # For the XPN case, we need to include 64bits of PN. Not supported right now + iv = self.sec_tag.sci + struct.pack('>I', self.sec_tag.pn) + cipher = AES.new(self.macsec_sak, AES.MODE_GCM, mac_len = self.macsec_ciphersuite.icv_len, nonce = iv) + + if self.sec_tag.e and self.sec_tag.c: + cipher.update(hdr_data) + data, self.sec_tag.icv = cipher.encrypt_and_digest(data) + + return bytes(hdr_data + data + self.sec_tag.icv) + else: + cipher.update(hdr_data + data) + self.sec_tag.icv = cipher.digest() + + return bytes(hdr_data + data + self.sec_tag.icv) + def __len__(self): tags = getattr(self, 'mpls_labels', []) + getattr(self, 'vlan_tags', []) _len = dpkt.Packet.__len__(self) + sum(t.__hdr_len__ for t in tags) @@ -319,14 +426,92 @@ def __load_types(): def _mod_init(): - """Post-initialization called when all dpkt modules are fully loaded""" + """Post-initialization called when all dpkt modules are fully loaded""" if not Ethernet._typesw: __load_types() # Misc protocols +ICV_LEN = { + 'AES-GCM-128': 16, + 'AES-GCM-256': 16, +} + +MACSEC_CIPHERSUITES = { + 'AES-GCM-128' : MACsecCipherSuite('AES', 128, 'GCM'), + 'AES-GCM-256' : MACsecCipherSuite('AES', 256, 'GCM'), +} + +class MACsecCipherSuite(object): + def __init__(self, cipher, blocksize, mode): + self._cipher = cipher + self._blocksize = blocksize + self._mode = mode + @property + def name(self): + return self._cipher + "-" + self._mode + "-" + str(self.blocksize) + + @property + def blocksize(self): + return self._blocksize + + @property + def icv_len(self): + return ICV_LEN[self.name] + + def __repr__(self): + return self.name + +class MACsec(dpkt.Packet): + """IEEE 802.1AE MACsec tag""" + + __hdr__ = ( + ('_tci_an', 'B', 0), + ('sl', 'B', 0), # short length + ('pn', 'L', 0), # packet number (LSBs for XPN) + # We cannot have 'type' as as a field to indicate the next layer type + # his is because type is potentially encrypted and can not be serialized as part of the header + ) + + __bit_fields__ = { + '_tci_an': ( + ('v', 1), # version, 1 bit + ('es', 1), # end station, 1 bit + ('sc', 1), # secure channel explicitly encoded, 1 bit + ('scb', 1), # EPON single copy broadcast, 1 bit + ('e', 1), # Encryption bit, 1 bit + ('c', 1), # Changed Text bit, 1 bit + ('an', 2), # association number, 2 bits + ) + } + + def __init__(self, *args, **kwargs): + self.sci = b'\0\0\0\0\0\0\0\0' # 8 bytes sci + self.icv = b'' # integrity check value, this is the trailer + self.type = 0 # next layer type + + dpkt.Packet.__init__(self, *args, **kwargs) + + def unpack(self, buf): + dpkt.Packet.unpack(self, buf) + self.data = b'' + + def pack_hdr(self): + sci = b'' + if self.sc: + sci = self.sci + if len(sci) != 8: + raise dpkt.PackError("SCI must be 8 bytes long") + return dpkt.Packet.pack_hdr(self) + sci + + def __len__(self): + sci_len = 0 + if self.sc: + sci_len = 8 + return self.__hdr_len__ + sci_len + class MPLSlabel(dpkt.Packet): """A single entry in MPLS label stack""" @@ -873,3 +1058,85 @@ def test_eth_novell(): assert isinstance(eth.data, dpkt.ipx.IPX) assert eth.data.tc == 2 assert eth.data.data == b'' + + +def test_macsec_c11(): + # C.1.1 GCM-AES-128 (54-octet frame integrity protection) + + userdata = """08 00 0F 10 11 12 13 14 15 16 17 18 19 1A 1B 1C + 1D 1E 1F 20 21 22 23 24 25 26 27 28 29 2A 2B 2C + 2D 2E 2F 30 31 32 33 34 00 01""".strip().replace("\n", "").replace(" ", "") + + c11 = """ + D6 09 B1 F0 56 63 + 7A 0D 46 DF 99 8D + 88 E5 + 22 + 2A + B2 C2 84 65 + 12 15 35 24 C0 89 5E 81 + 08 00 0F 10 11 12 13 14 15 16 17 18 19 1A 1B 1C + 1D 1E 1F 20 21 22 23 24 25 26 27 28 29 2A 2B 2C + 2D 2E 2F 30 31 32 33 34 00 01 + F0 94 78 A9 B0 90 07 D0 6F 46 E9 B6 A1 DA 25 DD + """.strip().replace("\n", "").replace(" ", "") + userdata = bytearray.fromhex(userdata) + c11 = bytearray.fromhex(c11) + + sak = bytearray.fromhex("AD7A2BD03EAC835A6F620FDCB506B345") + + e = Ethernet(dst= bytearray([0xD6, 0x09, 0xB1, 0xF0, 0x56, 0x63]), + src=bytearray([0x7A, 0x0D, 0x46, 0xDF, 0x99, 0x8D]), + sec_tag=MACsec(sc = 1, an = 0x2, e=0, c=0, pn = 0xB2C28465, sl = 0x2a, + sci=bytearray([0x12, 0x15, 0x35, 0x24, 0xC0, 0x89, 0x5E, 0x81])), + macsec_ciphersuite = MACSEC_CIPHERSUITES['AES-GCM-128'], + macsec_sak = sak, + data=userdata) + assert(e.pack() == c11) + + # Check if we can unpack a packet and get the same result for packing + e2 = Ethernet(c11, macsec_ciphersuite = MACSEC_CIPHERSUITES['AES-GCM-128'], macsec_sak = sak) + assert(e2.pack() == c11) + + + +def test_macsec_c71(): + # C.7.1 GCM-AES-128 (61-octet frame confidentiality protection) + sak = bytearray.fromhex("013FE00B5F11BE7F866D0CBBC55A7A90") + + userdata = bytearray([ + 0x08, 0x00, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, + 0x1D, 0x1E, 0x1F, 0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x2A, 0x2B, 0x2C, + 0x2D, 0x2E, 0x2F, 0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x3A, 0x3B, 0x00, + 0x06 + ]) + e = Ethernet(dst= bytearray([0x84, 0xC5, 0xD5, 0x13, 0xD2, 0xAA]), + src=bytearray([0xF6, 0xE5, 0xBB, 0xD2, 0x72, 0x77]), + sec_tag=MACsec(sc = 1, an = 0x3, e=1, c=1, pn = 0x8932d612, + sci=bytearray([0x7C, 0xFD, 0xE9, 0xF9, 0xE3, 0x37, 0x24, 0xC6])), + macsec_ciphersuite = MACSEC_CIPHERSUITES['AES-GCM-128'], + macsec_sak = sak, + data=userdata) + c71 = """ + 84 C5 D5 13 D2 AA + F6 E5 BB D2 72 77 + 88 E5 + 2F + 00 + 89 32 D6 12 + 7C FD E9 F9 E3 37 24 C6 + 3A 4D E6 FA 32 19 10 14 DB B3 03 D9 2E E3 A9 E8 + A1 B5 99 C1 4D 22 FB 08 00 96 E1 38 11 81 6A 3C + 9C 9B CF 7C 1B 9B 96 DA 80 92 04 E2 9D 0E 2A 76 + 42 + BF D3 10 A4 83 7C 81 6C CF A5 AC 23 AB 00 39 88 + """.strip().replace("\n", "").replace(" ", "") + c71 = bytearray.fromhex(c71) + assert (e.pack() == c71) + + + e2 = Ethernet(c71, + macsec_ciphersuite = MACSEC_CIPHERSUITES['AES-GCM-128'], + macsec_sak = sak,) + + assert (e2.pack() == c71) \ No newline at end of file