From 21f297d455755ff702309dcfae1578e950ce842d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eduard=20Br=C3=B6cker?= Date: Fri, 20 Sep 2019 11:03:11 +0200 Subject: [PATCH] add muxing support to scapy output; merge PR #398 also (#400) --- src/canmatrix/formats/scapy.py | 36 ++++++++++++++++--------------- src/canmatrix/tests/test_scapy.py | 16 +++++++++++--- 2 files changed, 32 insertions(+), 20 deletions(-) diff --git a/src/canmatrix/formats/scapy.py b/src/canmatrix/formats/scapy.py index a1c3d689..a504fdf1 100644 --- a/src/canmatrix/formats/scapy.py +++ b/src/canmatrix/formats/scapy.py @@ -45,6 +45,10 @@ def get_fmt(signal): # type: (canmatrix.Signal) -> str fmt += "B" return fmt +def signal_field_line(signal): + return u'SignalField("{}", default=0, start={}, size={}, scaling={}, offset={}, unit="{}", fmt="{}"),'.format( + signal.name, signal.get_startbit(bit_numbering=1), signal.size, signal.factor, signal.offset, + signal.unit, get_fmt(signal)) def dump(db, f, **options): # type: (canmatrix.CanMatrix, typing.IO, **typing.Any) -> None scapy_decoder = textwrap.dedent(""" #!/usr/bin/env python @@ -53,34 +57,32 @@ def dump(db, f, **options): # type: (canmatrix.CanMatrix, typing.IO, **typing.A from scapy.packet import bind_layers from scapy.fields import * from scapy.layers.can import * - class DBC(CAN): - name = 'DBC' - fields_desc = [ - FlagsField('flags', 0, 3, ['error', - 'remote_transmission_request', - 'extended']), - XBitField('arbitration_id', 0, 29), - ByteField('length', None), - ThreeBytesField('reserved', 0), - ] """) for frame in db.frames: - scapy_decoder += "class " + frame.name + "(Packet):\n" + scapy_decoder += "class " + frame.name + "(SignalPacket):\n" scapy_decoder += " fields_desc = [ \n" - for signal in frame.signals: - scapy_decoder += u' SignalField("{}", default=0, start={}, size={}, scaling={}, offset={}, unit="{}", fmt="{}"),\n'.format( - signal.name, signal.get_startbit(bit_numbering=1), signal.size, signal.factor, signal.offset, - signal.unit, get_fmt(signal)) + if frame.is_multiplexed and not frame.is_complex_multiplexed: + multiplexer = frame.get_multiplexer + scapy_decoder += u' ' + signal_field_line(multiplexer) + '\n' + for signal in frame.signals: + if signal != multiplexer and signal.mux_val is not None: + scapy_decoder += u' ConditionalField(' + signal_field_line(signal) + ' lambda p: p.{} == {}),\n'.format(multiplexer.name, signal.mux_val) + elif signal != multiplexer: + scapy_decoder += u' ' + signal_field_line(signal) + '\n' + + else: + for signal in frame.signals: + scapy_decoder += u' ' + signal_field_line(signal) + '\n' scapy_decoder += " ]\n\n" for frame in db.frames: if frame.arbitration_id.extended: - scapy_decoder += "bind_layers(DBC, " + frame.name + ", arbitration_id = " + hex( + scapy_decoder += "bind_layers(SignalHeader, " + frame.name + ", identifier = " + hex( frame.arbitration_id.id) + ", flags = extended)\n" else: - scapy_decoder += "bind_layers(DBC, " + frame.name + ", arbitration_id = " + hex( + scapy_decoder += "bind_layers(SignalHeader, " + frame.name + ", identifier = " + hex( frame.arbitration_id.id) + ")\n" f.write(scapy_decoder.encode("utf8")) diff --git a/src/canmatrix/tests/test_scapy.py b/src/canmatrix/tests/test_scapy.py index bf793436..171e9065 100644 --- a/src/canmatrix/tests/test_scapy.py +++ b/src/canmatrix/tests/test_scapy.py @@ -1,6 +1,6 @@ import canmatrix.formats.scapy import io - +import os def test_scapy_frame_exists(): db = canmatrix.CanMatrix() @@ -8,8 +8,18 @@ def test_scapy_frame_exists(): outscapy = io.BytesIO() canmatrix.formats.dump(db, outscapy, "scapy") - assert "class some_frame(Packet):" in outscapy.getvalue().decode("utf8") - assert "class DBC(CAN)" in outscapy.getvalue().decode("utf8") + assert "class some_frame(SignalPacket):" in outscapy.getvalue().decode("utf8") + + +def test_scapy_muliplexed_frame(): + here = os.path.dirname(os.path.realpath(__file__)) + db = canmatrix.formats.loadp_flat(os.path.join(here, "test_frame_decoding.dbc")) + outscapy = io.BytesIO() + canmatrix.formats.dump(db, outscapy, "scapy") + assert "ConditionalField" in outscapy.getvalue().decode("utf8") + assert "myMuxer == 0" in outscapy.getvalue().decode("utf8") + assert "myMuxer == 1" in outscapy.getvalue().decode("utf8") + def test_scapy_signal_exists(): db = canmatrix.CanMatrix()