diff --git a/okdmr/dmrlib/etsi/fec/hamming_common.py b/okdmr/dmrlib/etsi/fec/hamming_common.py index 23c9b65..483529d 100644 --- a/okdmr/dmrlib/etsi/fec/hamming_common.py +++ b/okdmr/dmrlib/etsi/fec/hamming_common.py @@ -56,9 +56,12 @@ def check_and_correct(cls, bits: bitarray) -> (bool, bitarray): numpy.array(bits.tolist()), cls.PARITY_CHECK_MATRIX ).tolist() try: + # print("syndrome", syndrome) + # print(cls.PARITY_CHECK_MATRIX.T.tolist()) bits.invert(cls.PARITY_CHECK_MATRIX.T.tolist().index(syndrome)) check = True except ValueError as e: + print(e) # ValueError is thrown in case syndrome is not found in parity check result, making the message uncorrectable return False, bits diff --git a/okdmr/dmrlib/etsi/layer2/burst.py b/okdmr/dmrlib/etsi/layer2/burst.py index 735bb5b..ac289d9 100644 --- a/okdmr/dmrlib/etsi/layer2/burst.py +++ b/okdmr/dmrlib/etsi/layer2/burst.py @@ -196,7 +196,7 @@ def __repr__(self) -> str: and self.emb.preemption_and_power_control_indicator == PreemptionPowerIndicator.CarriesReverseChannelInformation ): - status += f"[RC Info {self.embedded_signalling_bits}] " + status += f"[RC Info {self.embedded_signalling_bits} hex {self.embedded_signalling_bits.tobytes().hex()}] " if self.is_vocoder: status += f" [{self.voice_burst}]" diff --git a/okdmr/dmrlib/tools/pcap_tool.py b/okdmr/dmrlib/tools/pcap_tool.py index 6c3e7fc..6f01232 100644 --- a/okdmr/dmrlib/tools/pcap_tool.py +++ b/okdmr/dmrlib/tools/pcap_tool.py @@ -13,6 +13,7 @@ PreemptionPowerIndicator, ) from okdmr.dmrlib.etsi.layer2.pdu.full_link_control import FullLinkControl +from okdmr.dmrlib.etsi.layer2.pdu.reverse_channel import ReverseChannel from okdmr.dmrlib.transmission.transmission_watcher import TransmissionWatcher from okdmr.dmrlib.utils.parsing import try_parse_packet from okdmr.kaitai.homebrew.mmdvm2020 import Mmdvm2020 @@ -30,21 +31,34 @@ class EmbeddedExtractor: Helper class, collects """ - def __init__(self): + def __init__(self, filter_rc: bool = False): self.data: Dict[str, Tuple[LCSS, bitarray]] = {} + self.filter_rc: bool = filter_rc def process_packet(self, data: bytes, packet: IP) -> Optional[FullLinkControl]: burst: Optional[Burst] = PcapTool.debug_packet( data=data, packet=packet, hide_unknown=True, silent=True ) + if not burst or not burst.has_emb: + return + + if self.filter_rc: + if ( + burst.emb.preemption_and_power_control_indicator + == PreemptionPowerIndicator.CarriesReverseChannelInformation + ): + # print("FOUND RC") + print(data.hex()) + print(repr(burst)) + print(burst.embedded_signalling_bits.to01()) + return + if ( - not burst - or not burst.has_emb - or burst.emb.link_control_start_stop == LCSS.SingleFragmentLCorCSBK - or burst.emb.preemption_and_power_control_indicator + burst.emb.preemption_and_power_control_indicator == PreemptionPowerIndicator.CarriesReverseChannelInformation ): + # skip RC in following embedded extraction return full_lc: Optional[FullLinkControl] = None @@ -451,6 +465,13 @@ def _arguments() -> ArgumentParser: default=[], help='Filter traffic by "ORIGIN" IP address(es)', ) + parser.add_argument( + "--filter-rc", + dest="filter_rc", + default=False, + action="store_true", + help="Extract only bursts embedding RC by PreemptionPowerIndicator.CarriesReverseChannelInformation", + ) return parser @staticmethod @@ -478,8 +499,9 @@ def main( watcher = TransmissionWatcher().set_debug_voice_bytes( do_debug=args.debug_vocoder_bytes ) - if args.extract_embedded_lc: - callback = EmbeddedExtractor().process_packet + + if args.extract_embedded_lc or args.filter_rc: + callback = EmbeddedExtractor(filter_rc=args.filter_rc).process_packet elif args.observe_transmissions: callback = watcher.process_packet finish_callback = watcher.end_all_transmissions @@ -497,7 +519,7 @@ def main( finish_callback=finish_callback, ) - if args.analyze_ipsc: + if args.analyze_ipsc and not args.no_statistics: ipsc_analyze.print_stats() if return_stats: diff --git a/okdmr/tests/dmrlib/etsi/layer2/pdu/test_reverse_channel.py b/okdmr/tests/dmrlib/etsi/layer2/pdu/test_reverse_channel.py index 6b08450..9f8a537 100644 --- a/okdmr/tests/dmrlib/etsi/layer2/pdu/test_reverse_channel.py +++ b/okdmr/tests/dmrlib/etsi/layer2/pdu/test_reverse_channel.py @@ -1,3 +1,7 @@ +from typing import List, Tuple + +from okdmr.dmrlib.etsi.fec.hamming_16_11_4 import Hamming16114 +from okdmr.dmrlib.etsi.fec.vbptc_32_11 import VBPTC3211 from okdmr.dmrlib.etsi.layer2.pdu.reverse_channel import ReverseChannel from bitarray import bitarray @@ -5,15 +9,86 @@ def test_reverse_channel(): + merged_bits_odds = bitarray(f"01000000001101011" + f"10111111110010100") + + print( + "ODD PARITY" + if (merged_bits_odds.count(0) == merged_bits_odds.count(1)) + else "EVEN PARITY" + ) + print( + f"ZEROES {merged_bits_odds.count(0)}\tONES {merged_bits_odds.count(1)}\t\tLEN {len(merged_bits_odds)}" + ) + + merged_bits_even = bitarray(f"01000000010010000" + f"01000000010010000") + + print( + "ODD PARITY" + if (merged_bits_even.count(0) == merged_bits_even.count(1)) + else "EVEN PARITY" + ) + print( + f"ZEROES {merged_bits_even.count(0)}\tONES {merged_bits_even.count(1)}\t\tLEN {len(merged_bits_even)}" + ) + rcs: List[Tuple[str,]] = [ + ("00000000000000000111100100100010",), + ("00010000100000111000101000001010",), + ("01000000100100000000000000000000",), + ("01001100011111111110100101011010",), + ("01010101011101101000010110011111",), ("01011011011000001010111000010101",), - ("11111000000001100000011001000101",), + ("01011100111000010100011101111011",), + ("01101111100100100001010100111000",), + ("10001111110111110000010101010111",), + ("10010001101101110100111001010101",), + ("10011010011111101010111100001100",), + ("10110100101110001110101110010100",), ("10110101101110001110111110000100",), + ("11001100111111101100001011100101",), ("11101111101110001010111010010100",), - # ("",), + ("11111000000001100000011001000101",), + ("11111111111011111100111111111011",), + ("11111111111111111111000101111011",), + ("11111111111111111111111111111011",), + ("11111111111111111111111111111110",), + ("11111111111111111111111111111111",), ] for (rc,) in rcs: + print("-!-" * 20) rc_bits: bitarray = bitarray(rc) + rc_bits.reverse() rc_pdu: ReverseChannel = ReverseChannel.from_bits(rc_bits) print(repr(rc_pdu)) - assert rc_pdu.as_bits() == rc_bits + if False: + print("interleaved") + print(rc_bits) + print("deinterleaved") + rc_bits_deinterleaved = VBPTC3211.deinterleave_all_bits(rc_bits) + print(rc_bits_deinterleaved) + + rc_idx = [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20] + h_idx = [22, 24, 26, 28, 30] + pc_icx = [17, 19, 21, 23, 25, 27, 29, 31, 1, 3, 5, 7, 9, 11, 13, 15] + + rc2_bits = bitarray([rc_bits[x] for x in rc_idx]) + h_bits = bitarray([rc_bits[x] for x in h_idx]) + pc_bits = bitarray([rc_bits[x] for x in pc_icx]) + + if False: + print("manually") + + print("ODD PARITY" if (rc_bits.count(0) == rc_bits.count(1)) else "EVEN PARITY") + if False: + h_bits.reverse() + rc2_bits.reverse() + pc_bits.reverse() + + print(rc2_bits + h_bits) + print(pc_bits) + + hamming_data = h_bits + rc2_bits + # hamming_success, corrected_bits = Hamming16114.check_and_correct(hamming_data) + # print(f"hamming status: {'OK' if hamming_success else 'FAILED'}") + + # assert rc_pdu.as_bits() == rc_bits diff --git a/okdmr/tests/dmrlib/hytera/test_hytera_ipsc.py b/okdmr/tests/dmrlib/hytera/test_hytera_ipsc.py index 8e6945a..7aee24b 100644 --- a/okdmr/tests/dmrlib/hytera/test_hytera_ipsc.py +++ b/okdmr/tests/dmrlib/hytera/test_hytera_ipsc.py @@ -51,3 +51,43 @@ def test_wakeup(): b.deinterleave(bits=b.full_bits, data_type=DataTypes.Reserved) == b.full_bits ) + + +def test_multisite(): + msgs: List[str] = [ + "c35200502004000001000501010000001111cccc1111000040b8f29632653297a5f9d0886591215700100000210ee0b344a48ee2024c54298e23d4e3000000009a0204008d062800", + "c35200501f04000001000501010000001111bbbb1111000040b8c7a07322128f500fb22e20f49172625000b37bc04b517a06e692166398b1effcd417000000009a0204008d062800", + "c35200502004000001000501010000001111cccc1111000040b8f29632653297a5f9d0886591215700100000210ee0b344a48ee2024c54298e23d4e3000000009a0204008d062800", + "c3520050210400000100050101000000111177771111000040b8f9e1e24a262f2d7dd15e2b9227d3fd55f77d785f80c2b9fea7f6f61c2c3c90a7d420000000009a0204008d062800", + "c3520050220400000100050101000000111188881111000040b897e61f28165d9829c12a0ae801c66030a0901109a8bc40c746df22e42a041f40d43d000000009a0204008d062800", + "c3520050230400000100050101000000111199991111000040b857ec00e51c404a33da3be161214290702201472764e6b114ae968ebfcf22d19ad4d5000000009a0204008d062800", + "c35200502404000001000501010000001111aaaa1111000040b8a595be387fb9242de0f73dd741518071c36049f7c8a545398cd16e47e14a375cd44d000000009a0204008d062800", + "c35200502504000001000501010000001111bbbb1111000040b8e8c06745ba1ccb8fc2fd44e90152625000b37bc0e8b9ddd9ecd67002f8156a68d47e000000009a0204008d062800", + "c35200502604000001000501010000001111cccc1111000040b8fce67640302bd5f8e466248a51b000100000210e20ccb2dcbff09240ae6a3cb0d402000000009a0204008d062800", + "c3520050270400000100050101000000111177771111000040b8baf0924352e4dfdbd6d727ab37a3fd55f77d745f30ddc498d8e0b46b551c6f0ed45c000000009a0204008d062800", + "c3520050280400000100050101000000111188881111000040b8aee130a3560d1888f1adc08e21506030a09014096c8fd196feb254a5e605e7add45c000000009a0204008d062800", + ] + for msg in msgs: + ipsc: IpSiteConnectProtocol = IpSiteConnectProtocol.from_bytes( + bytes.fromhex(msg) + ) + b: Burst = Burst.from_hytera_ipsc(ipsc=ipsc) + print(repr(b)) + assert len(repr(b)) + # no objects serialization, so bits are equal + assert b.full_bits == b.as_bits() + + +def test_rc(): + msgs: List[str] = [ + "c35200503203000001000501010000001111cccc1111000040b8efdaf48911064091ff93519401ac00809207b622a1f52e62afef70c2f76446b5b80a00000000060100007f592800" + ] + for msg in msgs: + ipsc: IpSiteConnectProtocol = IpSiteConnectProtocol.from_bytes( + bytes.fromhex(msg) + ) + b: Burst = Burst.from_hytera_ipsc(ipsc=ipsc) + print(repr(b)) + assert len(repr(b)) + # no objects serialization, so bits are equal + assert b.full_bits == b.as_bits()