Skip to content

Commit

Permalink
Added initial support for MACsec (kbandla#672)
Browse files Browse the repository at this point in the history
Signed-off-by: Philip Axer <[email protected]>
  • Loading branch information
philipaxer committed Dec 9, 2024
1 parent 4f8958e commit e9c40f9
Show file tree
Hide file tree
Showing 2 changed files with 275 additions and 5 deletions.
3 changes: 3 additions & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ Dug Song <[email protected]>
Contributors
------------

Philip Axer <[email protected]>
MACsec support

Timur Alperovich <[email protected]>
radiotap module

Expand Down
277 changes: 272 additions & 5 deletions dpkt/ethernet.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
from .utils import mac_to_str
from .compat import compat_ord, iteritems, isstr

try:
from Crypto.Cipher import AES
crypto_support = True
except ImportError:
crypto_support = False

ETH_CRC_LEN = 4
ETH_HDR_LEN = 14

Expand Down Expand Up @@ -48,6 +54,7 @@
ETH_TYPE_LLDP = 0x88CC # Link Layer Discovery Protocol
ETH_TYPE_TEB = 0x6558 # Transparent Ethernet Bridging
ETH_TYPE_PROFINET = 0x8892 # PROFINET protocol
ETH_TYPE_MACSEC = 0x88E5 # MAC security

# all QinQ types for fast checking
_ETH_TYPES_QINQ = frozenset([ETH_TYPE_8021Q, ETH_TYPE_8021AD, ETH_TYPE_QINQ1, ETH_TYPE_QINQ2])
Expand Down Expand Up @@ -76,14 +83,19 @@ class Ethernet(dpkt.Packet):

__pprint_funcs__ = {
'dst': mac_to_str,
'src': mac_to_str,
'src': mac_to_str
}

def __init__(self, *args, **kwargs):
self._next_type = None
# We are setting these directly here. in case dpkt.Packet.__init__ is called with data as the first argument
# all the other arguments will be ignored
self.macsec_ciphersuite = kwargs.get('macsec_ciphersuite', None)
self.macsec_sak = kwargs.get('macsec_sak', None)

dpkt.Packet.__init__(self, *args, **kwargs)
# if data was given in kwargs, try to unpack it
if self.data:
if kwargs.get('data', None):
if isstr(self.data) or isinstance(self.data, bytes):
self._unpack_data(self.data)

Expand Down Expand Up @@ -128,14 +140,57 @@ def _unpack_data(self, buf):
if buf[:2] == b'\x00\x00': # looks like the control word (ECW)
buf = buf[4:] # skip the ECW
self._next_type = ETH_TYPE_TEB # re-use TEB class mapping to decode Ethernet

elif self._next_type == ETH_TYPE_MACSEC:
buf = self._unpack_macsec(buf)
try:
eth_type = self._next_type or self.type
self.data = self._typesw[eth_type](buf)

setattr(self, self.data.__class__.__name__.lower(), self.data)
except (KeyError, dpkt.UnpackError):
self.data = buf

def _unpack_macsec(self, buf):
if self.macsec_ciphersuite is None:
raise dpkt.PackError("MACsec ciphersuite is not set, unable to decode MACsec frame")

self.sec_tag = MACsec(buf)
offset = self.sec_tag.__hdr_len__
if self.sec_tag.sc:
self.sec_tag.sci = buf[offset : offset + 8]
offset += 8 # SCI is 8 bytes long

buf = buf[offset:]

if self.sec_tag.sl == 0:
self.sec_tag.icv = buf[-self.macsec_ciphersuite.icv_len:]
buf = buf[:-self.macsec_ciphersuite.icv_len]
else:
if len(buf) < self.sec_tag.sl:
raise dpkt.PackError("Not enough data to unpack MACsec frame")

self.sec_tag.icv = buf[self.sec_tag.sl : self.sec_tag.sl + self.macsec_ciphersuite.icv_len]
buf = buf[:self.sec_tag.sl]


if not crypto_support:
# in case we are not able to decrypt the frame, we just leave the data as is
self.data = buf
self._next_type = 0

if(self.sec_tag.e and self.sec_tag.c):
# Confientiality and Integrity
nonce = self.sec_tag.sci + struct.pack('>I', self.sec_tag.pn)
cipher = AES.new(self.macsec_sak, AES.MODE_GCM, mac_len = self.macsec_ciphersuite.icv_len, nonce = nonce)
buf = cipher.decrypt(buf)

self._next_type = struct.unpack('>H', buf[0:2])[0]
return buf[2:]
else:
# Integrity
self._next_type = struct.unpack('>H', buf[0:2])[0]
return buf[2:]

def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
if self.type > 1500:
Expand Down Expand Up @@ -200,6 +255,7 @@ def unpack(self, buf):

def pack_hdr(self):
tags_buf = b''
sec_tag_buf = b''
new_type = self.type # replacement self.type when packing eth header
is_isl = False # ISL wraps Ethernet, this determines order of packing

Expand All @@ -214,6 +270,17 @@ def pack_hdr(self):
new_type = ETH_TYPE_MPLS
tags_buf = b''.join(lbl.pack_hdr() for lbl in self.mpls_labels)

elif getattr(self, 'sec_tag', None):
last_tag_type = self.type # default
new_type = ETH_TYPE_MACSEC
if isinstance(self.data, dpkt.Packet):
last_tag_type = self._typesw_rev.get(self.data.__class__, self.type)
sec_tag = self.sec_tag
sec_tag.type = last_tag_type
if self.macsec_ciphersuite is None:
raise dpkt.PackError("MACsec ciphersuite is not set")
sec_tag_buf = sec_tag.pack_hdr()

elif getattr(self, 'vlan_tags', None):
# set last tag type to next layer pointed by self.data
last_tag_type = self.type # default
Expand Down Expand Up @@ -253,7 +320,7 @@ def pack_hdr(self):

hdr_buf = dpkt.Packet.pack_hdr(self)[:-2] + struct.pack('>H', new_type)
if not is_isl:
return hdr_buf + tags_buf
return hdr_buf + sec_tag_buf + tags_buf
else:
return tags_buf + hdr_buf

Expand All @@ -276,8 +343,48 @@ def __bytes__(self):
fcs = struct.unpack('<I', struct.pack('>I', revcrc))[0] # bswap32
fcs = struct.pack('>I', fcs)
tail = getattr(self, 'padding', b'') + fcs + getattr(self, 'trailer', b'')

if getattr(self, 'sec_tag', None) is not None:
return self._bytes_macsec()

return bytes(dpkt.Packet.__bytes__(self) + tail)

def _bytes_macsec(self):
if not crypto_support:
# Leave the payload data in clear and add a dummy ICV of zeros
# This results in an invalid MACsec frame, but it's the best we can do without the crypto library
return bytes(self.pack_hdr() + bytes(self.data) + self.sec_tag.icv)

hdr_data = self.pack_hdr()

if getattr(self, 'macsec_sak', None) is None:
raise dpkt.PackError("Not able to pack MACsec frame. SAK not set")

if(self.macsec_ciphersuite.blocksize / 8 != len(self.macsec_sak)):
raise dpkt.PackError("SAK length does not match the blocksize of the ciphersuite")

if(isinstance(self.data, dpkt.Packet)):
# When the next layer is a dpkt.Packet, we take the ethertype from the sec_tag
data = struct.pack('>H', self.sec_tag.type) + bytes(self.data)
else:
# Othwerise, we asume the userdata is raw data and includes the ethertype already
data = bytes(self.data)

# For the XPN case, we need to include 64bits of PN. Not supported right now
iv = self.sec_tag.sci + struct.pack('>I', self.sec_tag.pn)
cipher = AES.new(self.macsec_sak, AES.MODE_GCM, mac_len = self.macsec_ciphersuite.icv_len, nonce = iv)

if self.sec_tag.e and self.sec_tag.c:
cipher.update(hdr_data)
data, self.sec_tag.icv = cipher.encrypt_and_digest(data)

return bytes(hdr_data + data + self.sec_tag.icv)
else:
cipher.update(hdr_data + data)
self.sec_tag.icv = cipher.digest()

return bytes(hdr_data + data + self.sec_tag.icv)

def __len__(self):
tags = getattr(self, 'mpls_labels', []) + getattr(self, 'vlan_tags', [])
_len = dpkt.Packet.__len__(self) + sum(t.__hdr_len__ for t in tags)
Expand Down Expand Up @@ -319,14 +426,92 @@ def __load_types():


def _mod_init():
"""Post-initialization called when all dpkt modules are fully loaded"""
"""Post-initialization called when all dpkt modules are fully loaded"""
if not Ethernet._typesw:
__load_types()


# Misc protocols

ICV_LEN = {
'AES-GCM-128': 16,
'AES-GCM-256': 16,
}

MACSEC_CIPHERSUITES = {
'AES-GCM-128' : MACsecCipherSuite('AES', 128, 'GCM'),
'AES-GCM-256' : MACsecCipherSuite('AES', 256, 'GCM'),
}

class MACsecCipherSuite(object):
def __init__(self, cipher, blocksize, mode):
self._cipher = cipher
self._blocksize = blocksize
self._mode = mode

@property
def name(self):
return self._cipher + "-" + self._mode + "-" + str(self.blocksize)

@property
def blocksize(self):
return self._blocksize

@property
def icv_len(self):
return ICV_LEN[self.name]

def __repr__(self):
return self.name

class MACsec(dpkt.Packet):
"""IEEE 802.1AE MACsec tag"""

__hdr__ = (
('_tci_an', 'B', 0),
('sl', 'B', 0), # short length
('pn', 'L', 0), # packet number (LSBs for XPN)
# We cannot have 'type' as as a field to indicate the next layer type
# his is because type is potentially encrypted and can not be serialized as part of the header
)

__bit_fields__ = {
'_tci_an': (
('v', 1), # version, 1 bit
('es', 1), # end station, 1 bit
('sc', 1), # secure channel explicitly encoded, 1 bit
('scb', 1), # EPON single copy broadcast, 1 bit
('e', 1), # Encryption bit, 1 bit
('c', 1), # Changed Text bit, 1 bit
('an', 2), # association number, 2 bits
)
}

def __init__(self, *args, **kwargs):
self.sci = b'\0\0\0\0\0\0\0\0' # 8 bytes sci
self.icv = b'' # integrity check value, this is the trailer
self.type = 0 # next layer type

dpkt.Packet.__init__(self, *args, **kwargs)

def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
self.data = b''

def pack_hdr(self):
sci = b''
if self.sc:
sci = self.sci
if len(sci) != 8:
raise dpkt.PackError("SCI must be 8 bytes long")
return dpkt.Packet.pack_hdr(self) + sci

def __len__(self):
sci_len = 0
if self.sc:
sci_len = 8
return self.__hdr_len__ + sci_len

class MPLSlabel(dpkt.Packet):
"""A single entry in MPLS label stack"""

Expand Down Expand Up @@ -873,3 +1058,85 @@ def test_eth_novell():
assert isinstance(eth.data, dpkt.ipx.IPX)
assert eth.data.tc == 2
assert eth.data.data == b''


def test_macsec_c11():
# C.1.1 GCM-AES-128 (54-octet frame integrity protection)

userdata = """08 00 0F 10 11 12 13 14 15 16 17 18 19 1A 1B 1C
1D 1E 1F 20 21 22 23 24 25 26 27 28 29 2A 2B 2C
2D 2E 2F 30 31 32 33 34 00 01""".strip().replace("\n", "").replace(" ", "")

c11 = """
D6 09 B1 F0 56 63
7A 0D 46 DF 99 8D
88 E5
22
2A
B2 C2 84 65
12 15 35 24 C0 89 5E 81
08 00 0F 10 11 12 13 14 15 16 17 18 19 1A 1B 1C
1D 1E 1F 20 21 22 23 24 25 26 27 28 29 2A 2B 2C
2D 2E 2F 30 31 32 33 34 00 01
F0 94 78 A9 B0 90 07 D0 6F 46 E9 B6 A1 DA 25 DD
""".strip().replace("\n", "").replace(" ", "")
userdata = bytearray.fromhex(userdata)
c11 = bytearray.fromhex(c11)

sak = bytearray.fromhex("AD7A2BD03EAC835A6F620FDCB506B345")

e = Ethernet(dst= bytearray([0xD6, 0x09, 0xB1, 0xF0, 0x56, 0x63]),
src=bytearray([0x7A, 0x0D, 0x46, 0xDF, 0x99, 0x8D]),
sec_tag=MACsec(sc = 1, an = 0x2, e=0, c=0, pn = 0xB2C28465, sl = 0x2a,
sci=bytearray([0x12, 0x15, 0x35, 0x24, 0xC0, 0x89, 0x5E, 0x81])),
macsec_ciphersuite = MACSEC_CIPHERSUITES['AES-GCM-128'],
macsec_sak = sak,
data=userdata)
assert(e.pack() == c11)

# Check if we can unpack a packet and get the same result for packing
e2 = Ethernet(c11, macsec_ciphersuite = MACSEC_CIPHERSUITES['AES-GCM-128'], macsec_sak = sak)
assert(e2.pack() == c11)



def test_macsec_c71():
# C.7.1 GCM-AES-128 (61-octet frame confidentiality protection)
sak = bytearray.fromhex("013FE00B5F11BE7F866D0CBBC55A7A90")

userdata = bytearray([
0x08, 0x00, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C,
0x1D, 0x1E, 0x1F, 0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x2A, 0x2B, 0x2C,
0x2D, 0x2E, 0x2F, 0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x3A, 0x3B, 0x00,
0x06
])
e = Ethernet(dst= bytearray([0x84, 0xC5, 0xD5, 0x13, 0xD2, 0xAA]),
src=bytearray([0xF6, 0xE5, 0xBB, 0xD2, 0x72, 0x77]),
sec_tag=MACsec(sc = 1, an = 0x3, e=1, c=1, pn = 0x8932d612,
sci=bytearray([0x7C, 0xFD, 0xE9, 0xF9, 0xE3, 0x37, 0x24, 0xC6])),
macsec_ciphersuite = MACSEC_CIPHERSUITES['AES-GCM-128'],
macsec_sak = sak,
data=userdata)
c71 = """
84 C5 D5 13 D2 AA
F6 E5 BB D2 72 77
88 E5
2F
00
89 32 D6 12
7C FD E9 F9 E3 37 24 C6
3A 4D E6 FA 32 19 10 14 DB B3 03 D9 2E E3 A9 E8
A1 B5 99 C1 4D 22 FB 08 00 96 E1 38 11 81 6A 3C
9C 9B CF 7C 1B 9B 96 DA 80 92 04 E2 9D 0E 2A 76
42
BF D3 10 A4 83 7C 81 6C CF A5 AC 23 AB 00 39 88
""".strip().replace("\n", "").replace(" ", "")
c71 = bytearray.fromhex(c71)
assert (e.pack() == c71)


e2 = Ethernet(c71,
macsec_ciphersuite = MACSEC_CIPHERSUITES['AES-GCM-128'],
macsec_sak = sak,)

assert (e2.pack() == c71)

0 comments on commit e9c40f9

Please sign in to comment.