Skip to content

Commit

Permalink
tests for radio ip, udp/ipv4 compressed header and elements
Browse files Browse the repository at this point in the history
  • Loading branch information
smarek committed Aug 16, 2022
1 parent caa1c97 commit dc9d148
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 31 deletions.
2 changes: 1 addition & 1 deletion okdmr/dmrlib/etsi/layer3/elements/ip_address_identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ def from_bits(bits: bitarray) -> "IPAddressIdentifier":
return IPAddressIdentifier(ba2int(bits[0:4]))

def as_bits(self) -> bitarray:
return int2ba(self.value)
return int2ba(self.value, length=4)
2 changes: 1 addition & 1 deletion okdmr/dmrlib/etsi/layer3/elements/udp_port_identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ def from_bits(bits: bitarray) -> "UDPPortIdentifier":
return UDPPortIdentifier(ba2int(bits[:7]))

def as_bits(self) -> bitarray:
return int2ba(self.value)
return int2ba(self.value, length=7)
6 changes: 6 additions & 0 deletions okdmr/dmrlib/etsi/layer3/pdu/udp_ipv4_compressed_header.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ def from_bits(bits: bitarray) -> "UDPIPv4CompressedHeader":
spid == UDPPortIdentifier.InExtendedHeader
and dpid == UDPPortIdentifier.InExtendedHeader
):
assert (
len(bits) >= 72
), f"With both UDP ports in extended header, we need at least 72 bits, got {len(bits)}"
# both udp port numbers are in extended headers
e1 = ba2int(bits[data_start : data_start + 16])
e2 = ba2int(bits[data_start + 16 : data_start + 32])
Expand All @@ -89,6 +92,9 @@ def from_bits(bits: bitarray) -> "UDPIPv4CompressedHeader":
spid == UDPPortIdentifier.InExtendedHeader
or dpid == UDPPortIdentifier.InExtendedHeader
):
assert (
len(bits) >= 56
), f"With single UDP port in extended header, we need at least 56 bits, got {len(bits)}"
# single udp port id is in extended header
e1 = ba2int(bits[data_start : data_start + 16])
data_start += 16
Expand Down
34 changes: 21 additions & 13 deletions okdmr/dmrlib/hytera/pdu/radio_ip.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import sys
from typing import Optional, Union
import socket
from typing import Optional, Union, Literal

from okdmr.dmrlib.utils.bytes_interface import BytesInterface

Expand All @@ -10,11 +10,7 @@ class RadioIP(BytesInterface):
Hytera limits subnet to single byte (first IP octet)
"""

def __init__(
self,
radio_id: Union[int, bytes],
subnet: int = 0x0A,
):
def __init__(self, radio_id: Union[int, bytes], subnet: int = 0x0A):
self.subnet: int = subnet
self.radio_id: int = (
radio_id
Expand All @@ -23,19 +19,31 @@ def __init__(
)

@staticmethod
def from_bytes(data: bytes, endian: str = "big") -> Optional["RadioIP"]:
print(f"RadioIP from_bytes {data.hex()} endian {endian}", file=sys.stderr)
def from_bytes(
data: bytes, endian: Literal["big", "little"] = "big"
) -> Optional["RadioIP"]:
assert len(data) >= 4, f"4 bytes required to construct RadioIP"
return RadioIP(subnet=data[3], radio_id=data[0:3])
if endian == "little":
data = data[::-1]
return RadioIP(subnet=data[0], radio_id=data[1:4])

def as_bytes(self, endian: str = "big") -> bytes:
return self.radio_id.to_bytes(length=3, byteorder="big") + bytes([self.subnet])
raw: bytes = bytes([self.subnet]) + self.radio_id.to_bytes(
length=3, byteorder="big"
)
return raw if endian == "big" else raw[::-1]

def as_ip(self) -> str:
return f"{self.subnet}."
return str(self)

@staticmethod
def from_ip(ip: str, endian: Literal["big", "little"] = "big") -> "RadioIP":
return RadioIP.from_bytes(data=socket.inet_aton(ip), endian=endian)

def __str__(self):
return f"{self.subnet}.{self.radio_id}"
return socket.inet_ntoa(
bytes([self.subnet]) + self.radio_id.to_bytes(length=3, byteorder="big")
)

def __repr__(self):
return f"[RadioIP subnet:{self.subnet} id:{self.radio_id}]"
10 changes: 0 additions & 10 deletions okdmr/dmrlib/tools/pcap_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,9 @@ def debug_packet(
ip_str: str = f"{packet.src}:{packet.getlayer(UDP).sport}\t-> {packet.dst}:{packet.getlayer(UDP).dport}\t"
if isinstance(pkt, IpSiteConnectProtocol):
burst: Burst = Burst.from_hytera_ipsc(pkt)
if not silent:
print(
f"{ip_str} IPSC TS:{1 if pkt.timeslot_raw == IpSiteConnectProtocol.Timeslots.timeslot_1 else 2} "
f"SEQ: {pkt.sequence_number} {repr(burst)}"
)
elif isinstance(pkt, Mmdvm2020):
if isinstance(pkt.command_data, Mmdvm2020.TypeDmrData):
burst: Burst = Burst.from_mmdvm(pkt.command_data)
if not silent:
print(
f"{ip_str} MMDVM TS:{1 if pkt.command_data.slot_no == Mmdvm2020.Timeslots.timeslot_1 else 2} "
f"SEQ: {pkt.command_data.sequence_no} {repr(burst)}"
)
elif isinstance(pkt, IpSiteConnectHeartbeat):
pass
elif not hide_unknown and not silent:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from bitarray import bitarray

from okdmr.dmrlib.etsi.layer3.elements.ip_address_identifier import IPAddressIdentifier


def test_said_daid():
said = IPAddressIdentifier.from_bits(bits=bitarray("0001"))
assert said == IPAddressIdentifier.USBEthernetInterfaceNetwork
assert said.as_bits() == bitarray("0001")

assert IPAddressIdentifier(3) == IPAddressIdentifier.Reserved
assert IPAddressIdentifier(13) == IPAddressIdentifier.ManufacturerSpecific
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from bitarray import bitarray

from okdmr.dmrlib.etsi.layer3.elements.udp_port_identifier import UDPPortIdentifier


def test_spid_dpid():
assert UDPPortIdentifier(1) == UDPPortIdentifier.UTF16BE_TextMessage
assert UDPPortIdentifier.from_bits(bitarray("0000010")).as_bits() == bitarray(
"0000010"
)
assert UDPPortIdentifier(4) == UDPPortIdentifier.Reserved
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from bitarray import bitarray

from okdmr.dmrlib.etsi.layer3.pdu.udp_ipv4_compressed_header import (
UDPIPv4CompressedHeader,
)


def test_extended_headers():
both: UDPIPv4CompressedHeader = UDPIPv4CompressedHeader.from_bits(
bitarray("0" * 72)
)
assert both.extended_header_2 is not None
assert both.extended_header_1 is not None

single: UDPIPv4CompressedHeader = UDPIPv4CompressedHeader.from_bits(
bitarray("0" * 31 + "1" + "0" * 40)
)
assert single.extended_header_1 is not None
assert single.extended_header_2 is None
13 changes: 7 additions & 6 deletions okdmr/tests/dmrlib/hytera/pdu/test_radio_ip.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
from typing import List, Tuple

import pytest

from okdmr.dmrlib.hytera.pdu.radio_ip import RadioIP


@pytest.mark.skip
def test_radio_ip():
rips: List[Tuple] = [
# rcp
("0800000a", "10.0.0.8", 8, "little"),
# rrs, lp, tp, dtp
("0a000001", "10.0.0.1", 1, "big"),
("0a000050", "10.0.0.80", 80, "big"),
("0a2110dd", "10", 10, "big"),
# rcp
("0800000a", "10.0.0.8", 8, "little"),
("0a2110dd", "10.33.16.221", 2167005, "big"),
]
for rip_bytes, ip_str, radio_id, endian in rips:
rip = RadioIP.from_bytes(data=bytes.fromhex(rip_bytes), endian=endian)
assert rip.subnet == 10
assert rip.radio_id == radio_id
assert rip.as_ip() == ip_str
assert len(repr(rip))

rip = RadioIP.from_ip(ip="10.33.16.221", endian="big")
assert rip.as_ip() == "10.33.16.221"
6 changes: 6 additions & 0 deletions okdmr/tests/dmrlib/transmission/test_transmission.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,12 @@ def test_watcher(self, capsys):
terminal.debug(printout=True)
assert len(capsys.readouterr().out)

for ts_no, ts in terminal.timeslots.items():
assert len(ts.debug(printout=False))
capsys.readouterr()
ts.debug(printout=True)
assert len(capsys.readouterr().out)

watcher.end_all_transmissions()


Expand Down

0 comments on commit dc9d148

Please sign in to comment.