Skip to content

Commit

Permalink
rc development break
Browse files Browse the repository at this point in the history
  • Loading branch information
smarek committed Dec 2, 2024
1 parent 1a00ff4 commit 108e33f
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 12 deletions.
3 changes: 3 additions & 0 deletions okdmr/dmrlib/etsi/fec/hamming_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ def check_and_correct(cls, bits: bitarray) -> (bool, bitarray):
numpy.array(bits.tolist()), cls.PARITY_CHECK_MATRIX
).tolist()
try:
# print("syndrome", syndrome)
# print(cls.PARITY_CHECK_MATRIX.T.tolist())
bits.invert(cls.PARITY_CHECK_MATRIX.T.tolist().index(syndrome))
check = True
except ValueError as e:
print(e)
# ValueError is thrown in case syndrome is not found in parity check result, making the message uncorrectable
return False, bits

Expand Down
2 changes: 1 addition & 1 deletion okdmr/dmrlib/etsi/layer2/burst.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def __repr__(self) -> str:
and self.emb.preemption_and_power_control_indicator
== PreemptionPowerIndicator.CarriesReverseChannelInformation
):
status += f"[RC Info {self.embedded_signalling_bits}] "
status += f"[RC Info {self.embedded_signalling_bits} hex {self.embedded_signalling_bits.tobytes().hex()}] "

if self.is_vocoder:
status += f" [{self.voice_burst}]"
Expand Down
38 changes: 30 additions & 8 deletions okdmr/dmrlib/tools/pcap_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
PreemptionPowerIndicator,
)
from okdmr.dmrlib.etsi.layer2.pdu.full_link_control import FullLinkControl
from okdmr.dmrlib.etsi.layer2.pdu.reverse_channel import ReverseChannel
from okdmr.dmrlib.transmission.transmission_watcher import TransmissionWatcher
from okdmr.dmrlib.utils.parsing import try_parse_packet
from okdmr.kaitai.homebrew.mmdvm2020 import Mmdvm2020
Expand All @@ -30,21 +31,34 @@ class EmbeddedExtractor:
Helper class, collects
"""

def __init__(self):
def __init__(self, filter_rc: bool = False):
self.data: Dict[str, Tuple[LCSS, bitarray]] = {}
self.filter_rc: bool = filter_rc

def process_packet(self, data: bytes, packet: IP) -> Optional[FullLinkControl]:
burst: Optional[Burst] = PcapTool.debug_packet(
data=data, packet=packet, hide_unknown=True, silent=True
)

if not burst or not burst.has_emb:
return

if self.filter_rc:
if (
burst.emb.preemption_and_power_control_indicator
== PreemptionPowerIndicator.CarriesReverseChannelInformation
):
# print("FOUND RC")
print(data.hex())
print(repr(burst))
print(burst.embedded_signalling_bits.to01())
return

if (
not burst
or not burst.has_emb
or burst.emb.link_control_start_stop == LCSS.SingleFragmentLCorCSBK
or burst.emb.preemption_and_power_control_indicator
burst.emb.preemption_and_power_control_indicator
== PreemptionPowerIndicator.CarriesReverseChannelInformation
):
# skip RC in following embedded extraction
return

full_lc: Optional[FullLinkControl] = None
Expand Down Expand Up @@ -451,6 +465,13 @@ def _arguments() -> ArgumentParser:
default=[],
help='Filter traffic by "ORIGIN" IP address(es)',
)
parser.add_argument(
"--filter-rc",
dest="filter_rc",
default=False,
action="store_true",
help="Extract only bursts embedding RC by PreemptionPowerIndicator.CarriesReverseChannelInformation",
)
return parser

@staticmethod
Expand Down Expand Up @@ -478,8 +499,9 @@ def main(
watcher = TransmissionWatcher().set_debug_voice_bytes(
do_debug=args.debug_vocoder_bytes
)
if args.extract_embedded_lc:
callback = EmbeddedExtractor().process_packet

if args.extract_embedded_lc or args.filter_rc:
callback = EmbeddedExtractor(filter_rc=args.filter_rc).process_packet
elif args.observe_transmissions:
callback = watcher.process_packet
finish_callback = watcher.end_all_transmissions
Expand All @@ -497,7 +519,7 @@ def main(
finish_callback=finish_callback,
)

if args.analyze_ipsc:
if args.analyze_ipsc and not args.no_statistics:
ipsc_analyze.print_stats()

if return_stats:
Expand Down
81 changes: 78 additions & 3 deletions okdmr/tests/dmrlib/etsi/layer2/pdu/test_reverse_channel.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,94 @@
from typing import List, Tuple

from okdmr.dmrlib.etsi.fec.hamming_16_11_4 import Hamming16114
from okdmr.dmrlib.etsi.fec.vbptc_32_11 import VBPTC3211
from okdmr.dmrlib.etsi.layer2.pdu.reverse_channel import ReverseChannel
from bitarray import bitarray

from okdmr.dmrlib.utils.bits_bytes import bits_to_bytes, byteswap_bytes, bytes_to_bits


def test_reverse_channel():
merged_bits_odds = bitarray(f"01000000001101011" + f"10111111110010100")

print(
"ODD PARITY"
if (merged_bits_odds.count(0) == merged_bits_odds.count(1))
else "EVEN PARITY"
)
print(
f"ZEROES {merged_bits_odds.count(0)}\tONES {merged_bits_odds.count(1)}\t\tLEN {len(merged_bits_odds)}"
)

merged_bits_even = bitarray(f"01000000010010000" + f"01000000010010000")

print(
"ODD PARITY"
if (merged_bits_even.count(0) == merged_bits_even.count(1))
else "EVEN PARITY"
)
print(
f"ZEROES {merged_bits_even.count(0)}\tONES {merged_bits_even.count(1)}\t\tLEN {len(merged_bits_even)}"
)

rcs: List[Tuple[str,]] = [
("00000000000000000111100100100010",),
("00010000100000111000101000001010",),
("01000000100100000000000000000000",),
("01001100011111111110100101011010",),
("01010101011101101000010110011111",),
("01011011011000001010111000010101",),
("11111000000001100000011001000101",),
("01011100111000010100011101111011",),
("01101111100100100001010100111000",),
("10001111110111110000010101010111",),
("10010001101101110100111001010101",),
("10011010011111101010111100001100",),
("10110100101110001110101110010100",),
("10110101101110001110111110000100",),
("11001100111111101100001011100101",),
("11101111101110001010111010010100",),
# ("",),
("11111000000001100000011001000101",),
("11111111111011111100111111111011",),
("11111111111111111111000101111011",),
("11111111111111111111111111111011",),
("11111111111111111111111111111110",),
("11111111111111111111111111111111",),
]
for (rc,) in rcs:
print("-!-" * 20)
rc_bits: bitarray = bitarray(rc)
rc_bits.reverse()
rc_pdu: ReverseChannel = ReverseChannel.from_bits(rc_bits)
print(repr(rc_pdu))
assert rc_pdu.as_bits() == rc_bits
if False:
print("interleaved")
print(rc_bits)
print("deinterleaved")
rc_bits_deinterleaved = VBPTC3211.deinterleave_all_bits(rc_bits)
print(rc_bits_deinterleaved)

rc_idx = [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
h_idx = [22, 24, 26, 28, 30]
pc_icx = [17, 19, 21, 23, 25, 27, 29, 31, 1, 3, 5, 7, 9, 11, 13, 15]

rc2_bits = bitarray([rc_bits[x] for x in rc_idx])
h_bits = bitarray([rc_bits[x] for x in h_idx])
pc_bits = bitarray([rc_bits[x] for x in pc_icx])

if False:
print("manually")

print("ODD PARITY" if (rc_bits.count(0) == rc_bits.count(1)) else "EVEN PARITY")
if False:
h_bits.reverse()
rc2_bits.reverse()
pc_bits.reverse()

print(rc2_bits + h_bits)
print(pc_bits)

hamming_data = h_bits + rc2_bits
# hamming_success, corrected_bits = Hamming16114.check_and_correct(hamming_data)
# print(f"hamming status: {'OK' if hamming_success else 'FAILED'}")

# assert rc_pdu.as_bits() == rc_bits
40 changes: 40 additions & 0 deletions okdmr/tests/dmrlib/hytera/test_hytera_ipsc.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,43 @@ def test_wakeup():
b.deinterleave(bits=b.full_bits, data_type=DataTypes.Reserved)
== b.full_bits
)


def test_multisite():
msgs: List[str] = [
"c35200502004000001000501010000001111cccc1111000040b8f29632653297a5f9d0886591215700100000210ee0b344a48ee2024c54298e23d4e3000000009a0204008d062800",
"c35200501f04000001000501010000001111bbbb1111000040b8c7a07322128f500fb22e20f49172625000b37bc04b517a06e692166398b1effcd417000000009a0204008d062800",
"c35200502004000001000501010000001111cccc1111000040b8f29632653297a5f9d0886591215700100000210ee0b344a48ee2024c54298e23d4e3000000009a0204008d062800",
"c3520050210400000100050101000000111177771111000040b8f9e1e24a262f2d7dd15e2b9227d3fd55f77d785f80c2b9fea7f6f61c2c3c90a7d420000000009a0204008d062800",
"c3520050220400000100050101000000111188881111000040b897e61f28165d9829c12a0ae801c66030a0901109a8bc40c746df22e42a041f40d43d000000009a0204008d062800",
"c3520050230400000100050101000000111199991111000040b857ec00e51c404a33da3be161214290702201472764e6b114ae968ebfcf22d19ad4d5000000009a0204008d062800",
"c35200502404000001000501010000001111aaaa1111000040b8a595be387fb9242de0f73dd741518071c36049f7c8a545398cd16e47e14a375cd44d000000009a0204008d062800",
"c35200502504000001000501010000001111bbbb1111000040b8e8c06745ba1ccb8fc2fd44e90152625000b37bc0e8b9ddd9ecd67002f8156a68d47e000000009a0204008d062800",
"c35200502604000001000501010000001111cccc1111000040b8fce67640302bd5f8e466248a51b000100000210e20ccb2dcbff09240ae6a3cb0d402000000009a0204008d062800",
"c3520050270400000100050101000000111177771111000040b8baf0924352e4dfdbd6d727ab37a3fd55f77d745f30ddc498d8e0b46b551c6f0ed45c000000009a0204008d062800",
"c3520050280400000100050101000000111188881111000040b8aee130a3560d1888f1adc08e21506030a09014096c8fd196feb254a5e605e7add45c000000009a0204008d062800",
]
for msg in msgs:
ipsc: IpSiteConnectProtocol = IpSiteConnectProtocol.from_bytes(
bytes.fromhex(msg)
)
b: Burst = Burst.from_hytera_ipsc(ipsc=ipsc)
print(repr(b))
assert len(repr(b))
# no objects serialization, so bits are equal
assert b.full_bits == b.as_bits()


def test_rc():
msgs: List[str] = [
"c35200503203000001000501010000001111cccc1111000040b8efdaf48911064091ff93519401ac00809207b622a1f52e62afef70c2f76446b5b80a00000000060100007f592800"
]
for msg in msgs:
ipsc: IpSiteConnectProtocol = IpSiteConnectProtocol.from_bytes(
bytes.fromhex(msg)
)
b: Burst = Burst.from_hytera_ipsc(ipsc=ipsc)
print(repr(b))
assert len(repr(b))
# no objects serialization, so bits are equal
assert b.full_bits == b.as_bits()

0 comments on commit 108e33f

Please sign in to comment.