Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added initial support for MACsec (#672) #673

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ Dug Song <[email protected]>
Contributors
------------

Philip Axer <[email protected]>
MACsec support

Timur Alperovich <[email protected]>
radiotap module

Expand Down
277 changes: 272 additions & 5 deletions dpkt/ethernet.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
from .utils import mac_to_str
from .compat import compat_ord, iteritems, isstr

try:
from Crypto.Cipher import AES
crypto_support = True
except ImportError:
crypto_support = False

ETH_CRC_LEN = 4
ETH_HDR_LEN = 14

Expand Down Expand Up @@ -48,6 +54,7 @@
ETH_TYPE_LLDP = 0x88CC # Link Layer Discovery Protocol
ETH_TYPE_TEB = 0x6558 # Transparent Ethernet Bridging
ETH_TYPE_PROFINET = 0x8892 # PROFINET protocol
ETH_TYPE_MACSEC = 0x88E5 # MAC security

# all QinQ types for fast checking
_ETH_TYPES_QINQ = frozenset([ETH_TYPE_8021Q, ETH_TYPE_8021AD, ETH_TYPE_QINQ1, ETH_TYPE_QINQ2])
Expand Down Expand Up @@ -76,14 +83,19 @@ class Ethernet(dpkt.Packet):

__pprint_funcs__ = {
'dst': mac_to_str,
'src': mac_to_str,
'src': mac_to_str
}

def __init__(self, *args, **kwargs):
self._next_type = None
# We are setting these directly here. in case dpkt.Packet.__init__ is called with data as the first argument
# all the other arguments will be ignored
self.macsec_ciphersuite = kwargs.get('macsec_ciphersuite', None)
self.macsec_sak = kwargs.get('macsec_sak', None)

dpkt.Packet.__init__(self, *args, **kwargs)
# if data was given in kwargs, try to unpack it
if self.data:
if kwargs.get('data', None):
if isstr(self.data) or isinstance(self.data, bytes):
self._unpack_data(self.data)

Expand Down Expand Up @@ -128,14 +140,57 @@ def _unpack_data(self, buf):
if buf[:2] == b'\x00\x00': # looks like the control word (ECW)
buf = buf[4:] # skip the ECW
self._next_type = ETH_TYPE_TEB # re-use TEB class mapping to decode Ethernet

elif self._next_type == ETH_TYPE_MACSEC:
buf = self._unpack_macsec(buf)
try:
eth_type = self._next_type or self.type
self.data = self._typesw[eth_type](buf)

setattr(self, self.data.__class__.__name__.lower(), self.data)
except (KeyError, dpkt.UnpackError):
self.data = buf

def _unpack_macsec(self, buf):
if self.macsec_ciphersuite is None:
raise dpkt.PackError("MACsec ciphersuite is not set, unable to decode MACsec frame")

self.sec_tag = MACsec(buf)
offset = self.sec_tag.__hdr_len__
if self.sec_tag.sc:
self.sec_tag.sci = buf[offset : offset + 8]
offset += 8 # SCI is 8 bytes long

buf = buf[offset:]

if self.sec_tag.sl == 0:
self.sec_tag.icv = buf[-self.macsec_ciphersuite.icv_len:]
buf = buf[:-self.macsec_ciphersuite.icv_len]
else:
if len(buf) < self.sec_tag.sl:
raise dpkt.PackError("Not enough data to unpack MACsec frame")

self.sec_tag.icv = buf[self.sec_tag.sl : self.sec_tag.sl + self.macsec_ciphersuite.icv_len]
buf = buf[:self.sec_tag.sl]


if not crypto_support:
# in case we are not able to decrypt the frame, we just leave the data as is
self.data = buf
self._next_type = 0

if(self.sec_tag.e and self.sec_tag.c):
# Confientiality and Integrity
nonce = self.sec_tag.sci + struct.pack('>I', self.sec_tag.pn)
cipher = AES.new(self.macsec_sak, AES.MODE_GCM, mac_len = self.macsec_ciphersuite.icv_len, nonce = nonce)
buf = cipher.decrypt(buf)

self._next_type = struct.unpack('>H', buf[0:2])[0]
return buf[2:]
else:
# Integrity
self._next_type = struct.unpack('>H', buf[0:2])[0]
return buf[2:]

def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
if self.type > 1500:
Expand Down Expand Up @@ -200,6 +255,7 @@ def unpack(self, buf):

def pack_hdr(self):
tags_buf = b''
sec_tag_buf = b''
new_type = self.type # replacement self.type when packing eth header
is_isl = False # ISL wraps Ethernet, this determines order of packing

Expand All @@ -214,6 +270,17 @@ def pack_hdr(self):
new_type = ETH_TYPE_MPLS
tags_buf = b''.join(lbl.pack_hdr() for lbl in self.mpls_labels)

elif getattr(self, 'sec_tag', None):
last_tag_type = self.type # default
new_type = ETH_TYPE_MACSEC
if isinstance(self.data, dpkt.Packet):
last_tag_type = self._typesw_rev.get(self.data.__class__, self.type)
sec_tag = self.sec_tag
sec_tag.type = last_tag_type
if self.macsec_ciphersuite is None:
raise dpkt.PackError("MACsec ciphersuite is not set")
sec_tag_buf = sec_tag.pack_hdr()

elif getattr(self, 'vlan_tags', None):
# set last tag type to next layer pointed by self.data
last_tag_type = self.type # default
Expand Down Expand Up @@ -253,7 +320,7 @@ def pack_hdr(self):

hdr_buf = dpkt.Packet.pack_hdr(self)[:-2] + struct.pack('>H', new_type)
if not is_isl:
return hdr_buf + tags_buf
return hdr_buf + sec_tag_buf + tags_buf
else:
return tags_buf + hdr_buf

Expand All @@ -276,8 +343,48 @@ def __bytes__(self):
fcs = struct.unpack('<I', struct.pack('>I', revcrc))[0] # bswap32
fcs = struct.pack('>I', fcs)
tail = getattr(self, 'padding', b'') + fcs + getattr(self, 'trailer', b'')

if getattr(self, 'sec_tag', None) is not None:
return self._bytes_macsec()

return bytes(dpkt.Packet.__bytes__(self) + tail)

def _bytes_macsec(self):
if not crypto_support:
# Leave the payload data in clear and add a dummy ICV of zeros
# This results in an invalid MACsec frame, but it's the best we can do without the crypto library
return bytes(self.pack_hdr() + bytes(self.data) + self.sec_tag.icv)

hdr_data = self.pack_hdr()

if getattr(self, 'macsec_sak', None) is None:
raise dpkt.PackError("Not able to pack MACsec frame. SAK not set")

if(self.macsec_ciphersuite.blocksize / 8 != len(self.macsec_sak)):
raise dpkt.PackError("SAK length does not match the blocksize of the ciphersuite")

if(isinstance(self.data, dpkt.Packet)):
# When the next layer is a dpkt.Packet, we take the ethertype from the sec_tag
data = struct.pack('>H', self.sec_tag.type) + bytes(self.data)
else:
# Othwerise, we asume the userdata is raw data and includes the ethertype already
data = bytes(self.data)

# For the XPN case, we need to include 64bits of PN. Not supported right now
iv = self.sec_tag.sci + struct.pack('>I', self.sec_tag.pn)
cipher = AES.new(self.macsec_sak, AES.MODE_GCM, mac_len = self.macsec_ciphersuite.icv_len, nonce = iv)

if self.sec_tag.e and self.sec_tag.c:
cipher.update(hdr_data)
data, self.sec_tag.icv = cipher.encrypt_and_digest(data)

return bytes(hdr_data + data + self.sec_tag.icv)
else:
cipher.update(hdr_data + data)
self.sec_tag.icv = cipher.digest()

return bytes(hdr_data + data + self.sec_tag.icv)

def __len__(self):
tags = getattr(self, 'mpls_labels', []) + getattr(self, 'vlan_tags', [])
_len = dpkt.Packet.__len__(self) + sum(t.__hdr_len__ for t in tags)
Expand Down Expand Up @@ -319,14 +426,92 @@ def __load_types():


def _mod_init():
"""Post-initialization called when all dpkt modules are fully loaded"""
"""Post-initialization called when all dpkt modules are fully loaded"""
if not Ethernet._typesw:
__load_types()


# Misc protocols

ICV_LEN = {
'AES-GCM-128': 16,
'AES-GCM-256': 16,
}

MACSEC_CIPHERSUITES = {
'AES-GCM-128' : MACsecCipherSuite('AES', 128, 'GCM'),
'AES-GCM-256' : MACsecCipherSuite('AES', 256, 'GCM'),
}

class MACsecCipherSuite(object):
def __init__(self, cipher, blocksize, mode):
self._cipher = cipher
self._blocksize = blocksize
self._mode = mode

@property
def name(self):
return self._cipher + "-" + self._mode + "-" + str(self.blocksize)

@property
def blocksize(self):
return self._blocksize

@property
def icv_len(self):
return ICV_LEN[self.name]

def __repr__(self):
return self.name

class MACsec(dpkt.Packet):
"""IEEE 802.1AE MACsec tag"""

__hdr__ = (
('_tci_an', 'B', 0),
('sl', 'B', 0), # short length
('pn', 'L', 0), # packet number (LSBs for XPN)
# We cannot have 'type' as as a field to indicate the next layer type
# his is because type is potentially encrypted and can not be serialized as part of the header
)

__bit_fields__ = {
'_tci_an': (
('v', 1), # version, 1 bit
('es', 1), # end station, 1 bit
('sc', 1), # secure channel explicitly encoded, 1 bit
('scb', 1), # EPON single copy broadcast, 1 bit
('e', 1), # Encryption bit, 1 bit
('c', 1), # Changed Text bit, 1 bit
('an', 2), # association number, 2 bits
)
}

def __init__(self, *args, **kwargs):
self.sci = b'\0\0\0\0\0\0\0\0' # 8 bytes sci
self.icv = b'' # integrity check value, this is the trailer
self.type = 0 # next layer type

dpkt.Packet.__init__(self, *args, **kwargs)

def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
self.data = b''

def pack_hdr(self):
sci = b''
if self.sc:
sci = self.sci
if len(sci) != 8:
raise dpkt.PackError("SCI must be 8 bytes long")
return dpkt.Packet.pack_hdr(self) + sci

def __len__(self):
sci_len = 0
if self.sc:
sci_len = 8
return self.__hdr_len__ + sci_len

class MPLSlabel(dpkt.Packet):
"""A single entry in MPLS label stack"""

Expand Down Expand Up @@ -873,3 +1058,85 @@ def test_eth_novell():
assert isinstance(eth.data, dpkt.ipx.IPX)
assert eth.data.tc == 2
assert eth.data.data == b''


def test_macsec_c11():
# C.1.1 GCM-AES-128 (54-octet frame integrity protection)

userdata = """08 00 0F 10 11 12 13 14 15 16 17 18 19 1A 1B 1C
1D 1E 1F 20 21 22 23 24 25 26 27 28 29 2A 2B 2C
2D 2E 2F 30 31 32 33 34 00 01""".strip().replace("\n", "").replace(" ", "")

c11 = """
D6 09 B1 F0 56 63
7A 0D 46 DF 99 8D
88 E5
22
2A
B2 C2 84 65
12 15 35 24 C0 89 5E 81
08 00 0F 10 11 12 13 14 15 16 17 18 19 1A 1B 1C
1D 1E 1F 20 21 22 23 24 25 26 27 28 29 2A 2B 2C
2D 2E 2F 30 31 32 33 34 00 01
F0 94 78 A9 B0 90 07 D0 6F 46 E9 B6 A1 DA 25 DD
""".strip().replace("\n", "").replace(" ", "")
userdata = bytearray.fromhex(userdata)
c11 = bytearray.fromhex(c11)

sak = bytearray.fromhex("AD7A2BD03EAC835A6F620FDCB506B345")

e = Ethernet(dst= bytearray([0xD6, 0x09, 0xB1, 0xF0, 0x56, 0x63]),
src=bytearray([0x7A, 0x0D, 0x46, 0xDF, 0x99, 0x8D]),
sec_tag=MACsec(sc = 1, an = 0x2, e=0, c=0, pn = 0xB2C28465, sl = 0x2a,
sci=bytearray([0x12, 0x15, 0x35, 0x24, 0xC0, 0x89, 0x5E, 0x81])),
macsec_ciphersuite = MACSEC_CIPHERSUITES['AES-GCM-128'],
macsec_sak = sak,
data=userdata)
assert(e.pack() == c11)

# Check if we can unpack a packet and get the same result for packing
e2 = Ethernet(c11, macsec_ciphersuite = MACSEC_CIPHERSUITES['AES-GCM-128'], macsec_sak = sak)
assert(e2.pack() == c11)



def test_macsec_c71():
# C.7.1 GCM-AES-128 (61-octet frame confidentiality protection)
sak = bytearray.fromhex("013FE00B5F11BE7F866D0CBBC55A7A90")

userdata = bytearray([
0x08, 0x00, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C,
0x1D, 0x1E, 0x1F, 0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x2A, 0x2B, 0x2C,
0x2D, 0x2E, 0x2F, 0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x3A, 0x3B, 0x00,
0x06
])
e = Ethernet(dst= bytearray([0x84, 0xC5, 0xD5, 0x13, 0xD2, 0xAA]),
src=bytearray([0xF6, 0xE5, 0xBB, 0xD2, 0x72, 0x77]),
sec_tag=MACsec(sc = 1, an = 0x3, e=1, c=1, pn = 0x8932d612,
sci=bytearray([0x7C, 0xFD, 0xE9, 0xF9, 0xE3, 0x37, 0x24, 0xC6])),
macsec_ciphersuite = MACSEC_CIPHERSUITES['AES-GCM-128'],
macsec_sak = sak,
data=userdata)
c71 = """
84 C5 D5 13 D2 AA
F6 E5 BB D2 72 77
88 E5
2F
00
89 32 D6 12
7C FD E9 F9 E3 37 24 C6
3A 4D E6 FA 32 19 10 14 DB B3 03 D9 2E E3 A9 E8
A1 B5 99 C1 4D 22 FB 08 00 96 E1 38 11 81 6A 3C
9C 9B CF 7C 1B 9B 96 DA 80 92 04 E2 9D 0E 2A 76
42
BF D3 10 A4 83 7C 81 6C CF A5 AC 23 AB 00 39 88
""".strip().replace("\n", "").replace(" ", "")
c71 = bytearray.fromhex(c71)
assert (e.pack() == c71)


e2 = Ethernet(c71,
macsec_ciphersuite = MACSEC_CIPHERSUITES['AES-GCM-128'],
macsec_sak = sak,)

assert (e2.pack() == c71)