diff --git a/scapy/contrib/knx.py b/scapy/contrib/knx.py index 5193db01ecd..7fd0ce9f0f7 100644 --- a/scapy/contrib/knx.py +++ b/scapy/contrib/knx.py @@ -1,8 +1,8 @@ # SPDX-License-Identifier: GPL-2.0-or-later # This file is part of Scapy # See https://scapy.net/ for more information -# Copyright (C) 2021 Julien BEDEL -# Claire VACHEROT +# Copyright (C) 2021-2025 Julien BEDEL +# Claire VACHEROT """ KNXNet/IP @@ -22,10 +22,9 @@ # scapy.contrib.description = KNX Protocol # scapy.contrib.status = loads -import struct -from scapy.fields import PacketField, MultipleTypeField, ByteField, \ - XByteField, ShortEnumField, ShortField, \ +from scapy.fields import PacketField, MultipleTypeField, ByteField, XByteField, \ + ShortEnumField, ShortField, \ ByteEnumField, IPField, StrFixedLenField, MACField, XBitField, \ PacketListField, FieldLenField, \ StrLenField, BitEnumField, BitField, ConditionalField @@ -49,7 +48,8 @@ 0x0310: "CONFIGURATION_REQUEST", 0x0311: "CONFIGURATION_ACK", 0x0420: "TUNNELING_REQUEST", - 0x0421: "TUNNELING_ACK" + 0x0421: "TUNNELING_ACK", + 0x0530: "ROUTING_INDICATION" } # KNX Standard v2.1 - 03_08_02 p39 @@ -82,6 +82,7 @@ # KNX Standard v2.1 - 03_08_04 MESSAGE_CODES = { 0x11: "L_Data.req", + 0x29: "L_data.ind", 0x2e: "L_Data.con", 0xFC: "M_PropRead.req", 0xFB: "M_PropRead.con", @@ -101,14 +102,25 @@ # KNX Standard v2.1 - 03_03_07 p9 KNX_ACPI_CODES = { - 0: "GroupValueRead", - 1: "GroupValueResp", - 2: "GroupValueWrite", - 3: "IndAddrWrite", - 4: "IndAddrRead", - 5: "IndAddrResp", - 6: "AdcRead", - 7: "AdcResp" + 0x0: "GroupValueRead", + 0x1: "GroupValueResp", + 0x2: "GroupValueWrite", + 0x3: "IndAddrWrite", + 0x4: "IndAddrRead", + 0x5: "IndAddrResp", + 0x6: "AdcRead", + 0x7: "AdcResp", + 0x8: "MemRead", + 0xa: "MemWrite", + 0xc: "DevDescrRead", + 0x0380: "Restart", + 0x03D1: "AuthReq", + 0x03D5: "PropValueRead" +} + +KNX_SERVICE_CODES = { + 0x00: "Connect", + 0x01: "Disconnect" } CEMI_OBJECT_TYPES = { @@ -153,34 +165,29 @@ # KNX Standard v2.1 - 03_05_01 p.17 class KNXAddressField(ShortField): - def i2repr(self, pkt, x): + def i2repr(self, _, x): if x is None: return None - else: - return "%d.%d.%d" % ((x >> 12) & 0xf, (x >> 8) & 0xf, (x & 0xff)) + return "%d.%d.%d" % ((x >> 12) & 0xf, (x >> 8) & 0xf, (x & 0xff)) def any2i(self, pkt, x): - if isinstance(x, str): - try: - a, b, c = map(int, x.split(".")) - x = (a << 12) | (b << 8) | c - except ValueError: - raise ValueError(x) + # Raises ValueError in x does not have format a/b/c + if type(x) is str: + a, b, c = map(int, x.split(".")) + x = (a << 12) | (b << 8) | c return ShortField.any2i(self, pkt, x) # KNX Standard v2.1 - 03_05_01 p.18 class KNXGroupField(ShortField): - def i2repr(self, pkt, x): + def i2repr(self, _, x): return "%d/%d/%d" % ((x >> 11) & 0x1f, (x >> 8) & 0x7, (x & 0xff)) def any2i(self, pkt, x): - if isinstance(x, str): - try: - a, b, c = map(int, x.split("/")) - x = (a << 11) | (b << 8) | c - except ValueError: - raise ValueError(x) + # Raises ValueError in x does not have format a/b/c + if type(x) is str: + a, b, c = map(int, x.split("/")) + x = (a << 11) | (b << 8) | c return ShortField.any2i(self, pkt, x) @@ -197,8 +204,7 @@ class HPAI(Packet): ] def post_build(self, p, pay): - if self.structure_length is None: - p = struct.pack("!B", len(p)) + p[1:] + p = (len(p)).to_bytes(1, byteorder='big') + p[1:] return p + pay @@ -212,8 +218,8 @@ class ServiceFamily(Packet): # Different DIB types depends on the "description_type_code" field -# Defining a generic DIB packet and differentiating with `dispatch_hook` or -# `MultipleTypeField` may better fit KNX specs +# Defining a generic DIB packet and differentiating with `dispatch_hook` +# or `MultipleTypeField` may better fit KNX specs class DIBDeviceInfo(Packet): name = "DIB: DEVICE_INFO" fields_desc = [ @@ -230,8 +236,7 @@ class DIBDeviceInfo(Packet): ] def post_build(self, p, pay): - if self.structure_length is None: - p = struct.pack("!B", len(p)) + p[1:] + p = (len(p)).to_bytes(1, byteorder='big') + p[1:] return p + pay @@ -241,17 +246,14 @@ class DIBSuppSvcFamilies(Packet): ByteField("structure_length", 0x02), ByteEnumField("description_type", 0x02, DESCRIPTION_TYPE_CODES), ConditionalField( - PacketListField("service_family", - ServiceFamily(), - ServiceFamily, - length_from=lambda pkt: - pkt.structure_length - 0x02), + PacketListField("service_family", ServiceFamily(), ServiceFamily, + length_from=lambda + pkt: pkt.structure_length - 0x02), lambda pkt: pkt.structure_length > 0x02) ] def post_build(self, p, pay): - if self.structure_length is None: - p = struct.pack("!B", len(p)) + p[1:] + p = (len(p)).to_bytes(1, byteorder='big') + p[1:] return p + pay @@ -277,15 +279,13 @@ class CRI(Packet): fields_desc = [ ByteField("structure_length", 0x02), ByteEnumField("connection_type", 0x03, CONNECTION_TYPE_CODES), - ConditionalField(PacketField("connection_data", - TunnelingConnection(), + ConditionalField(PacketField("connection_data", TunnelingConnection(), TunnelingConnection), lambda pkt: pkt.connection_type == 0x04) ] def post_build(self, p, pay): - if self.structure_length is None: - p = struct.pack("!B", len(p)) + p[1:] + p = (len(p)).to_bytes(1, byteorder='big') + p[1:] return p + pay @@ -294,15 +294,13 @@ class CRD(Packet): fields_desc = [ ByteField("structure_length", 0x00), ByteEnumField("connection_type", 0x03, CONNECTION_TYPE_CODES), - ConditionalField(PacketField("connection_data", - CRDTunnelingConnection(), + ConditionalField(PacketField("connection_data", CRDTunnelingConnection(), CRDTunnelingConnection), lambda pkt: pkt.connection_type == 0x04) ] def post_build(self, p, pay): - if self.structure_length is None: - p = struct.pack("!B", len(p)) + p[1:] + p = (len(p)).to_bytes(1, byteorder='big') + p[1:] return p + pay @@ -319,36 +317,50 @@ class LcEMI(Packet): BitEnumField("frame_type", 1, 1, { 1: "standard" }), - BitField("reserved_1", 0, 1), + BitField("reserved", 0, 1), BitField("repeat_on_error", 1, 1), BitEnumField("broadcast_type", 1, 1, { 1: "domain" }), BitEnumField("priority", 3, 2, { + 0: "system", 3: "low" }), BitField("ack_request", 0, 1), BitField("confirmation_error", 0, 1), # Controlfield 2 (1 byte made of 1+3+4 bits) BitEnumField("address_type", 1, 1, { + 0: "individual", 1: "group" }), BitField("hop_count", 6, 3), BitField("extended_frame_format", 0, 4), KNXAddressField("source_address", None), - KNXGroupField("destination_address", "1/2/3"), + MultipleTypeField( + [ + (KNXGroupField("destination_address", "1/2/3"), + lambda pkt: pkt.address_type == 1), + (KNXAddressField("destination_address", "1.2.3"), + lambda pkt: pkt.address_type == 0) + ], + ShortField("destination_address", "") + ), FieldLenField("npdu_length", 0x01, fmt="B", length_of="data"), # TPCI and APCI (2 byte made of 1+1+4+4+6 bits) BitEnumField("packet_type", 0, 1, { - 0: "data" + 0: "data", + 1: "control" }), BitEnumField("sequence_type", 0, 1, { 0: "unnumbered" }), - BitField("reserved_2", 0, 4), - BitEnumField("acpi", 2, 4, KNX_ACPI_CODES), - BitField("data", 0, 6) - + BitField("sequence_number", 0, 4), # Not used when sequence_type = unnumbered + ConditionalField(BitEnumField("acpi", 2, 4, KNX_ACPI_CODES), + lambda pkt: pkt.packet_type == 0), + ConditionalField(BitEnumField("service", 0, 2, KNX_SERVICE_CODES), + lambda pkt: pkt.packet_type == 1), + ConditionalField(BitField("data", 0, 6), + lambda pkt: pkt.packet_type == 0) ] @@ -392,7 +404,7 @@ class CEMI(Packet): # KNX Standard v2.1 - 03_08_02 p28 class KNXSearchRequest(Packet): - name = "SEARCH_REQUEST", + name = "SEARCH_REQUEST" fields_desc = [ PacketField("discovery_endpoint", HPAI(), HPAI) ] @@ -400,7 +412,7 @@ class KNXSearchRequest(Packet): # KNX Standard v2.1 - 03_08_02 p28 class KNXSearchResponse(Packet): - name = "SEARCH_RESPONSE", + name = "SEARCH_RESPONSE" fields_desc = [ PacketField("control_endpoint", HPAI(), HPAI), PacketField("device_info", DIBDeviceInfo(), DIBDeviceInfo), @@ -424,8 +436,8 @@ class KNXDescriptionResponse(Packet): PacketField("device_info", DIBDeviceInfo(), DIBDeviceInfo), PacketField("supported_service_families", DIBSuppSvcFamilies(), DIBSuppSvcFamilies) - # TODO: this is an optional field in KNX specs, - # => Add conditions to take it into account + # TODO: this is an optional field in KNX specs, add conditions to + # take it into account # PacketField("other_device_info", DIBDeviceInfo(), DIBDeviceInfo) ] @@ -501,8 +513,7 @@ class KNXConfigurationRequest(Packet): ] def post_build(self, p, pay): - if self.structure_length is None: - p = struct.pack("!B", len(p[:4])) + p[1:] + p = (len(p[:4])).to_bytes(1, byteorder='big') + p[1:] return p + pay @@ -517,8 +528,7 @@ class KNXConfigurationACK(Packet): ] def post_build(self, p, pay): - if self.structure_length is None: - p = struct.pack("!B", len(p)) + p[1:] + p = (len(p)).to_bytes(1, byteorder='big') + p[1:] return p + pay @@ -534,8 +544,7 @@ class KNXTunnelingRequest(Packet): ] def post_build(self, p, pay): - if self.structure_length is None: - p = struct.pack("!B", len(p[:4])) + p[1:] + p = (len(p[:4])).to_bytes(1, byteorder='big') + p[1:] return p + pay @@ -550,8 +559,18 @@ class KNXTunnelingACK(Packet): ] def post_build(self, p, pay): - if self.structure_length is None: - p = struct.pack("!B", len(p)) + p[1:] + p = (len(p)).to_bytes(1, byteorder='big') + p[1:] + return p + pay + + +class KNXRoutingIndication(Packet): + name = "ROUTING_INDICATION" + fields_desc = [ + PacketField("cemi", CEMI(), CEMI) + ] + + def post_build(self, p, pay): + p = (len(p[:4])).to_bytes(1, byteorder='big') + p[1:] return p + pay @@ -559,7 +578,7 @@ def post_build(self, p, pay): # we made the choice to define a KNX service as a payload for a KNX Header # it could also be possible to define the body as a conditional PacketField -# contained after header +# contained after the header. class KNX(Packet): name = "KNXnet/IP" @@ -572,18 +591,17 @@ class KNX(Packet): def post_build(self, p, pay): # computes header_length - if self.header_length is None: - p = struct.pack("!B", len(p)) + p[1:] + p = (len(p)).to_bytes(1, byteorder='big') + p[1:] # computes total_length - if self.total_length is None: - p = p[:-2] + struct.pack("!H", len(p) + len(pay)) + p = p[:-2] + (len(p) + len(pay)).to_bytes(2, byteorder='big') return p + pay # LAYERS BINDING + bind_bottom_up(UDP, KNX, dport=3671) bind_bottom_up(UDP, KNX, sport=3671) -bind_layers(UDP, KNX, sport=3671, dport=3671) +bind_layers(UDP, KNX, dport=3671, sport=3671) bind_layers(KNX, KNXSearchRequest, service_identifier=0x0201) bind_layers(KNX, KNXSearchResponse, service_identifier=0x0202) @@ -599,15 +617,17 @@ def post_build(self, p, pay): bind_layers(KNX, KNXConfigurationACK, service_identifier=0x0311) bind_layers(KNX, KNXTunnelingRequest, service_identifier=0x0420) bind_layers(KNX, KNXTunnelingACK, service_identifier=0x0421) +bind_layers(KNX, KNXRoutingIndication, service_identifier=0x0530) # we bind every layer to Padding in order to delete their payloads # (from https://github.com/secdev/scapy/issues/360) -# we could also define a new Packet class with no payload and -# inherit every KNX packet from it : +# we could also define a new Packet class with no payload and inherit +# every KNX packet from it : +# # class _KNXBodyNoPayload(Packet): # -# def extract_padding(self, s): -# return b"", None +# def extract_padding(self, s): +# return b"", None bind_layers(HPAI, Padding) bind_layers(ServiceFamily, Padding) diff --git a/test/contrib/knx.uts b/test/contrib/knx.uts index c7ab8abf7ee..300616eba4c 100644 --- a/test/contrib/knx.uts +++ b/test/contrib/knx.uts @@ -69,7 +69,42 @@ assert isinstance(p.payload, KNXTunnelingRequest) p = KNX(b'\x06\x10\x04!\x00\n\x04\x01\x00\x00') assert isinstance(p.payload, KNXTunnelingACK) -+ Test layer binding -= Destination port - - ++ Test KNX packet parsing += KNX Search Request +pkt = KNX(b'\x06\x10\x02\x01\x00\x0e\x08\x01\x00\x00\x00\x00\x00\x00') +assert pkt.service_identifier == 0x0201 +assert pkt.discovery_endpoint.ip_address == "0.0.0.0" + += KNX Search response +pkt = KNX(b'\x06\x10\x02\x02\x00F\x08\x01\x00\x00\x00\x00\x00\x006\x01\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x02') +assert pkt.service_identifier == 0x0202 +assert pkt.device_info.description_type == 1 + ++ Test KNX packet building += KNX Search Request +pkt = KNX()/KNXSearchRequest() +pkt.discovery_endpoint.ip_address = "192.168.1.1" +assert raw(pkt) == b'\x06\x10\x02\x01\x00\x0e\x08\x01\xc0\xa8\x01\x01\x00\x00' + += KNX Search Response +pkt = KNX()/KNXSearchResponse() +pkt.control_endpoint.port = 3671 +pkt.device_info.device_multicast_address = "224.0.23.12" +pkt.device_info.device_mac_address = "ff:ff:ff:ff:ff:ff" + +assert raw(pkt) == b'\x06\x10\x02\x02\x00F\x08\x01\x00\x00\x00\x00\x0eW6\x01\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xe0\x00\x17\x0c\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x02' + += KNX Individual address parsing +test_addr = DIBDeviceInfo() +test_addr.knx_address = "1.1.1" +assert test_addr.knx_address == 0x1101 + += CEMI individual address in a field that takes multiple types +test_addr = CEMI(message_code=0x11) # L_Data.req +test_addr.cemi_data.address_type = 0 # Individual address +test_addr.cemi_data.destination_address = "1.1.1" + += CEMI group address in a field that takes multiple types +test_addr = CEMI(message_code=0x11) # L_Data.req +test_addr.cemi_data.address_type = 1 # Group address +test_addr.cemi_data.destination_address = "1/1/1"