From c45daeb2003c7c0c531707a4124ceb21f6cf544e Mon Sep 17 00:00:00 2001 From: john <4852923-john---@users.noreply.gitlab.com> Date: Fri, 1 Mar 2024 17:13:56 -0600 Subject: [PATCH] reorganize scan_cycle function --- README.md | 4 +- apps/cursesgui.py | 61 ++++++------- apps/h2h_types.py | 16 ++++ apps/ham2mon.py | 4 +- apps/scanner.py | 217 +++++++++++++++++++++------------------------- 5 files changed, 152 insertions(+), 150 deletions(-) create mode 100644 apps/h2h_types.py diff --git a/README.md b/README.md index 2a541a3..2ae5cdc 100644 --- a/README.md +++ b/README.md @@ -216,7 +216,7 @@ The demodulator blocks are put into a hierarchical GR block so multiple can be i The scanner.py contains the control code, and may be run on on it's own non-interactively. It instantiates the receiver.py with N demodulators and probes the average spectrum at ~10 Hz. The spectrum is processed with estimate.py, which takes a weighted average of the spectrum bins that are above a threshold. This weighted average does a fair job of estimating the modulated channel center to sub-kHz resolution given the RBW is several kHz. The estimate.py returns a list of baseband channels that are rounded to the nearest 5 kHz (for NBFM band plan ambiguity). -The lockout channels are removed from the list and the list used to tune the demodulators. The demodulators are only tuned if the channel has ceased activity from the last probe or if a higher priority channel has activity. Otherwise, the demodulator is held on the channel. The demodulators are parked at 0 Hz baseband when not tuned, as this provides a constant, low amplitude signal due to FM demod of LO leakage. +The list used to tune the demodulators (lockout channels are skipped). The demodulators are only tuned if the channel has ceased activity from the last probe or if a higher priority channel has activity. Otherwise, the demodulator is held on the channel. The demodulators are parked at 0 Hz baseband when not tuned, as this provides a constant, low amplitude signal due to FM demod of LO leakage. The ham2mon.py interfaces the scanner.py with the curses.py GUI. The GUI provides a spectral display with adjustable scaling and detector threshold line. The center frequency, gain, squelch, and volume can be adjusted in real time, as well as adding channel lockouts. The hardware arguments, sample rate, number of demodulators, recording status, and lockout file are set via switches at run time. @@ -227,7 +227,7 @@ The next iteration of this program will probably use gr-dsd to decode P25 public ## Priority File The Priority file contains a frequency (in Hz) in each line. The frequencies are to be arranged in descending priority order. Therefore, the highest priority frequenncy will be the one at the top. -When the scanner detects a priority frequency it will demodulate that frequency over any one that is lower priority. Without a priority file the scanner will only demodulate a frequency if there is a demodulator that is inactive. +When the scanner detects a priority frequency it will demodulate that frequency over any one that is lower priority. Priority channels with be flagged with a 'P' in the CHANNELS section. Without a priority file the scanner will only demodulate a frequency if there is a demodulator that is inactive. A use case for this is as follows: You want to hear something, anything, but want to hear certain things over other things if they're actually happening. In this case there would be one demodulator being fed to the speakers. To do this, place local repeaters (in priority order) into the priority file. If all of the repeaters are idle, other frequencies will be heard. However, if a channel more important becomes active, those of lessor importance will be dropped in favor of the channel with more priority. diff --git a/apps/cursesgui.py b/apps/cursesgui.py index 982e72b..c16d33f 100644 --- a/apps/cursesgui.py +++ b/apps/cursesgui.py @@ -6,11 +6,13 @@ @author: madengr """ import locale -locale.setlocale(locale.LC_ALL, '') import curses import time import numpy as np +import logging +from h2h_types import Channel +locale.setlocale(locale.LC_ALL, '') class SpectrumWindow(object): """Curses spectrum display window @@ -192,7 +194,7 @@ def __init__(self, screen): self.dims = self.win.getmaxyx() - def draw_channels(self, gui_tuned_channels, gui_active_channels): + def draw_channels(self, channels: list[Channel]): """Draws tuned channels list Args: @@ -208,33 +210,32 @@ def draw_channels(self, gui_tuned_channels, gui_active_channels): # Limit the displayed channels to no more than two rows max_length = 2*(self.dims[0]-2) - if len(gui_tuned_channels) > max_length: - gui_tuned_channels = gui_tuned_channels[:max_length] - else: - pass - - active_channels = set(gui_active_channels) # Draw the tuned channels prefixed by index in list (demodulator index) - # Use color if tuned channel is in active channel list during this scan_cycle - for idx, gui_tuned_channel in enumerate(gui_tuned_channels): - text = str(idx) - text = text.zfill(2) + ": " + f'{gui_tuned_channel:.3f}' + # Use color if tuned channel is active during this scan_cycle + subset = channels[:max_length] + subset = [c for c in subset if c.active or c.hanging] + for idx, channel in enumerate(subset): + icon = 'P' if channel.priority else '' + text = f'{idx:02d}: {channel.frequency:.3f}' if idx < self.dims[0]-2: # Display in first column # text color based on activity - # curses.color_pair(5) - if gui_tuned_channel in active_channels: - self.win.addnstr(idx+1, 1, text, 11, curses.color_pair(2) | curses.A_BOLD) + if channel.active: + self.win.addnstr(idx+1, 1, text, 12, curses.color_pair(2) | curses.A_BOLD) + self.win.addnstr(idx+1, 12, icon, 1, curses.color_pair(2)) else: - self.win.addnstr(idx+1, 1, text, 11, curses.color_pair(6)) + self.win.addnstr(idx+1, 1, text, 12, curses.color_pair(6)) + self.win.addnstr(idx+1, 12, icon, 1, curses.color_pair(6)) else: # Display in second column self.win.addnstr(idx-self.dims[0]+3, 13, text, 11) - if gui_tuned_channel in active_channels: - self.win.addnstr(idx-self.dims[0]+3, 13, text, 11, curses.color_pair(2)) + if channel.active: + self.win.addnstr(idx-self.dims[0]+3, 13, text, 11, curses.color_pair(2) | curses.A_BOLD) + self.win.addnstr(idx-self.dims[0]+3, 24, icon, 1, curses.color_pair(2)) else: - self.win.addnstr(idx-self.dims[0]+3, 13, text, 11) + self.win.addnstr(idx-self.dims[0]+3, 13, text, 11, curses.color_pair(6)) + self.win.addnstr(idx-self.dims[0]+3, 24, icon, 1, curses.color_pair(6)) # Hide cursor self.win.leaveok(1) @@ -264,7 +265,7 @@ def __init__(self, screen): self.win = curses.newwin(height, width, screen_dims[0] - height - 1, width+1) self.dims = self.win.getmaxyx() - def draw_channels(self, gui_lockout_channels, gui_active_channels): + def draw_channels(self, gui_lockout_channels, channels: list[Channel]): """Draws lockout channels list Args: @@ -277,23 +278,23 @@ def draw_channels(self, gui_lockout_channels, gui_active_channels): self.win.addnstr(0, int(self.dims[1]/2-3), "LOCKOUT", 7, curses.color_pair(6) | curses.A_DIM | curses.A_BOLD) - active_channels = set(gui_active_channels) - # Draw the lockout channels # Use color if lockout channel is in active channel list during this scan_cycle - for idx, lockout_channel in enumerate(gui_lockout_channels): + locked_channels = [c for c in channels if c.locked] + for idx, lockout in enumerate(gui_lockout_channels): # Don't draw past height of window if idx <= self.dims[0]-3: attr = curses.color_pair(6) - if isinstance(lockout_channel, dict): # handle this range - text = f"{lockout_channel['min']:.3f}-{lockout_channel['max']:.3f}" - for channel in active_channels: - if lockout_channel['min'] <= channel <= lockout_channel['max']: + if isinstance(lockout, dict): # handle this range + text = f"{lockout['min']:.3f}-{lockout['max']:.3f}" + for channel in locked_channels: + if lockout['min'] <= channel.frequency <= lockout['max']: attr = curses.color_pair(5) | curses.A_BOLD else: # handle this single frequency - text = f"{lockout_channel:.3f}" - if lockout_channel in active_channels: - attr = curses.color_pair(5) | curses.A_BOLD + text = f"{lockout:.3f}" + for channel in locked_channels: + if lockout == channel.frequency: + attr = curses.color_pair(5) | curses.A_BOLD self.win.addnstr(idx+1, 1, text, 20, attr) else: pass diff --git a/apps/h2h_types.py b/apps/h2h_types.py new file mode 100644 index 0000000..8957e6a --- /dev/null +++ b/apps/h2h_types.py @@ -0,0 +1,16 @@ +""" +Created on Thu Feb 29 09:00:22 2024 + +@author: john +""" + +from dataclasses import dataclass + +@dataclass(kw_only=True) +class Channel: + baseband: int + frequency: float + locked: bool + active: bool + priority: bool + hanging: bool diff --git a/apps/ham2mon.py b/apps/ham2mon.py index 5107b00..f1b8eee 100644 --- a/apps/ham2mon.py +++ b/apps/ham2mon.py @@ -98,8 +98,8 @@ async def cycle(self): # Update the spectrum, channel, and rx displays self.specwin.draw_spectrum(self.scanner.spectrum) - self.chanwin.draw_channels(self.scanner.gui_tuned_channels, self.scanner.gui_active_channels) - self.lockoutwin.draw_channels(self.scanner.gui_lockout_channels, self.scanner.gui_active_channels) + self.chanwin.draw_channels(self.scanner.get_channels()) + self.lockoutwin.draw_channels(self.scanner.gui_lockout_channels, self.scanner.get_channels()) self.rxwin.draw_rx() # Update physical screen diff --git a/apps/scanner.py b/apps/scanner.py index 71edb3d..753d769 100644 --- a/apps/scanner.py +++ b/apps/scanner.py @@ -8,39 +8,16 @@ import receiver as recvr import estimate import h2m_parser as prsr +from h2h_types import Channel import time import numpy as np import sys -import types import datetime import errors as err import yaml import logging - -PY3 = sys.version_info[0] == 3 -PY2 = sys.version_info[0] == 2 - -if PY3: - import builtins - # list-producing versions of the major Python iterating functions - def lrange(*args, **kwargs): - return list(range(*args, **kwargs)) - - def lzip(*args, **kwargs): - return list(zip(*args, **kwargs)) - - def lmap(*args, **kwargs): - return list(map(*args, **kwargs)) - - def lfilter(*args, **kwargs): - return list(filter(*args, **kwargs)) -else: - import __builtin__ - # Python 2-builtin ranges produce lists - lrange = __builtin__.range - lzip = __builtin__.zip - lmap = __builtin__.map - lfilter = __builtin__.filter +from numpy.typing import NDArray +import builtins class Scanner(object): """Scanner that controls receiver @@ -81,9 +58,6 @@ class Scanner(object): spectrum (numpy.ndarray): FFT power spectrum data in linear, not dB lockout_channels [float]: List of baseband lockout channels in Hz priority_channels [float]: List of baseband priority channels in Hz - gui_tuned_channels [str] List of tuned RF channels in MHz for GUI - gui_active_channels [str] List of active RF channels in MHz for GUI (currently above threshold) - gui_tuned_lockout_channels [str]: List of lockout channels in MHz GUI channel_spacing (float): Spacing that channels will be rounded lockout_file_name (string): Name of file with channels to lockout priority_file_name (string): Name of file with channels for priority @@ -115,9 +89,7 @@ def __init__(self, ask_samp_rate=4E6, num_demod=4, type_demod=0, self.spectrum = [] self.lockout_channels = [] self.priority_channels = [] - self.active_channels = [] - self.gui_tuned_channels = [] - self.gui_active_channels = [] + self._enriched_channels = list[Channel] self.gui_lockout_channels = [] self.channel_spacing = channel_spacing self.lockout_file_name = lockout_file_name @@ -125,7 +97,6 @@ def __init__(self, ask_samp_rate=4E6, num_demod=4, type_demod=0, self.channel_log_file_name = channel_log_file_name self.channel_log_file = None self.channel_log_timeout = channel_log_timeout - self.log_recent_channels = [] self.log_timeout_last = int(time.time()) self.log_mode = "" self.hang_time = 1.0 @@ -167,7 +138,7 @@ def __del__(self): if self.channel_log_file != None : self.channel_log_file.close() - def __print_channel_log_active__(self, freq, state): + def _print_channel_log_active(self, freq, state): if self.log_mode is not None and self.log_mode != "none" and state is True: state_str = {True: "act", False: "off"} now = datetime.datetime.now() @@ -188,7 +159,7 @@ def __print_channel_log_active__(self, freq, state): # cannot log unknown mode raise(err.LogError("unknown","no log mode defined")) - def __print_channel_log__(self, freq, state, idx): + def _print_channel_log(self, freq, state, idx): if self.log_mode is not None and self.log_mode != "none": state_str = {True: "on", False: "off"} if state == False: @@ -211,37 +182,41 @@ def __print_channel_log__(self, freq, state, idx): # cannot log unknown mode raise(err.LogError("unknown","no log mode defined")) - def scan_cycle(self): + def scan_cycle(self) -> None: """Execute one scan cycle Should be called no more than 10 Hz rate Estimates channels from FFT power spectrum that are above threshold Rounds channels to nearest 5 kHz - Removes channels that are already a priority Moves priority channels in front - Removes channels that are locked out Tunes demodulators to new channels Holds demodulators on channels between scan cycles - Creates RF channel lists for GUI + Add metadata to channels for GUI and further processing + Log recent active channels """ - # pylint: disable=too-many-branches - # Retune demodulators that are locked out - # TODO: this looks to be missing range lockouts and only checks frequency lockouts - # also see below where locked out channels and ranges are removed - for demodulator in self.receiver.demodulators: - if demodulator.center_freq in self.lockout_channels: - demodulator.set_center_freq(0, self.center_freq) - else: - pass + channels = self._get_raw_channels() + self._process_current_demodulators(channels) + + self._assign_channels_to_demodulators(channels) + + self._enriched_channels = self._add_metadata(channels) + + self._log_recent_active_channels() + + def get_channels(self) -> list[Channel]: + return self._enriched_channels + + def _get_raw_channels(self) -> NDArray: # Grab the FFT data, set threshold, and estimate baseband channels self.spectrum = self.receiver.probe_signal_vf.level() threshold = 10**(self.threshold_db/10.0) - channels = np.array(estimate.channel_estimate(self.spectrum, threshold)) + channels = np.array( + estimate.channel_estimate(self.spectrum, threshold)) # Convert channels from bin indices to baseband frequency in Hz - channels = (channels-len(self.spectrum)/2)*\ + channels = (channels-len(self.spectrum)/2) *\ self.samp_rate/len(self.spectrum) # Round channels to channel spacing @@ -250,86 +225,97 @@ def scan_cycle(self): # Note that channel spacing is with respect to the center + baseband offset, # not just the offset itself real_channels = channels + self.center_freq - real_channels = np.round(real_channels / self.channel_spacing) * self.channel_spacing - channels = real_channels - self.center_freq + real_channels = np.round( + real_channels / self.channel_spacing) * self.channel_spacing + channels = (real_channels - self.center_freq).astype(int) + # Remove 0 as this also represents an unassigned demodulator + # As a result, valid signals at the center point will be ignored + channels = channels[channels != 0] - # set active channels for gui highlight before filtering down lockout or adding priority - active_channels = channels + return channels - # remove channels and ranges that are locked out - temp = [] - for channel in channels: - if not self.locked_out(channel): - temp = np.append(temp, channel) - else: - pass - channels = temp + def _process_current_demodulators(self, channels: NDArray) -> None: - # Update demodulator last heards and expire old ones the_now = time.time() for idx in range(len(self.receiver.demodulators)): demodulator = self.receiver.demodulators[idx] - if (demodulator.center_freq != 0) and (demodulator.center_freq not in channels): + if demodulator.center_freq == 0: + continue + + # Stop locked out demodulator + if self.locked_out(demodulator.center_freq): + demodulator.set_center_freq(0, self.center_freq) + continue + + # Stop the demodulator if not being scanned and outside the hang time + if demodulator.center_freq not in channels: if the_now - demodulator.last_heard > self.hang_time: demodulator.set_center_freq(0, self.center_freq) # Write in channel log file that the channel is off demodulator_freq = demodulator.center_freq - self.__print_channel_log__(demodulator_freq + self.center_freq, False, idx) + # TODO: Does logging need to occur if demodulators zeroed out elsewhere? + self._print_channel_log( + demodulator_freq + self.center_freq, False, idx) else: - #pass demodulator.set_last_heard(the_now) + # Stop any long running modulators + if self.max_recording > 0: + if time.time() - demodulator.time_stamp >= self.max_recording: + # clear the demodulator to reset file + demodulator.set_center_freq(0, self.center_freq) + + def _assign_channels_to_demodulators(self, channels: NDArray) -> None: + + # assign channels to available demodulators for channel in channels: # If channel not in demodulators - if channel not in self.receiver.get_demod_freqs(): + if channel not in self.receiver.get_demod_freqs() and not self.locked_out(channel): # Sequence through each demodulator for idx in range(len(self.receiver.demodulators)): demodulator = self.receiver.demodulators[idx] # If channel is higher priority than what is being demodulated if self.is_higher_priority(channel, demodulator.center_freq): # Write in channel log file that the channel is on - self.__print_channel_log__(channel + self.center_freq, True, idx) + self._print_channel_log( + channel + self.center_freq, True, idx) # Assigning channel to empty demodulator - demodulator.set_center_freq(channel, self.center_freq) + demodulator.set_center_freq( + channel, self.center_freq) break else: pass else: pass - # Stop any long running modulators - if self.max_recording > 0: - for demodulator in self.receiver.demodulators: - if (demodulator.center_freq != 0) and \ - (time.time() - demodulator.time_stamp >= self.max_recording): - temp_freq = demodulator.center_freq - # clear the demodulator to reset file - demodulator.set_center_freq(0, self.center_freq) - # reset the demodulator to its frequency to restart file - demodulator.set_center_freq(0, temp_freq) - - # Create an tuned channel list of strings for the GUI in Mhz - # If channel is a zero then use an empty string - self.gui_tuned_channels = [] - for demod_freq in self.receiver.get_demod_freqs(): - if demod_freq != 0: - # Calculate actual RF frequency in Mhz - gui_tuned_channel = (demod_freq + \ - self.center_freq)/1E6 - self.gui_tuned_channels.append(gui_tuned_channel) - - # Create an active channel list of strings for the GUI in Mhz - # This is any channel above threshold - # do not include priority if not above threshold - # do include lockout if above threshold - self.gui_active_channels = [] - for channel in active_channels: - # calculate active channel freq in MHz - gui_active_channel = (channel + self.center_freq)/1E6 - self.gui_active_channels.append(gui_active_channel) - # Add active channel to recent list for logging if not already there - if gui_active_channel not in self.log_recent_channels: - self.log_recent_channels.append(gui_active_channel) + def _add_metadata(self, active_channels: NDArray) -> list[Channel]: + + demod_freqs = self.receiver.get_demod_freqs() + + # If a demodulator is not in channel list than it is waiting for hang time to end + # There is no activity on it so it was not in the scan + all_channels = active_channels # start out with the active channels + + for demod_freq in demod_freqs: + if demod_freq != 0 and demod_freq not in all_channels: + all_channels = np.append(all_channels, demod_freq) + + sweep: list[Channel] = [] + for channel in all_channels: + frequency = self._baseband_to_frequency(channel) + priority: bool = channel in self.priority_channels + idx = 0 if priority else len(sweep) # priority channels up front + sweep.insert(idx, Channel(baseband=channel, + frequency=frequency, + locked=self.locked_out(channel), + active=channel in demod_freqs and channel in active_channels, + priority=priority, + hanging=channel in demod_freqs and channel not in active_channels)) + + return sweep + + def _log_recent_active_channels(self) -> None: + # TODO: should this functionality be moved into the demodulator? # log recently active channels if we are beyond timeout delay from last logging # clear list of recently active channels after logging @@ -340,13 +326,14 @@ def scan_cycle(self): # set last timeout to this timestamp self.log_timeout_last = cur_timestamp # iterate all recent channels print to log - for channel in self.log_recent_channels: + log_channels = [ + c for c in self._enriched_channels if c.active or c.hanging] + for channel in log_channels: # Write in channel log file that the channel is on - self.__print_channel_log_active__(float(channel)*1E6, True) - # clear recent channels - self.log_recent_channels = [] + self._print_channel_log_active( + float(channel.frequency)*1E6, True) - def is_higher_priority(self, channel, demod_freq): + def is_higher_priority(self, channel: int, demod_freq: int) -> bool: if demod_freq == 0: return True @@ -366,7 +353,7 @@ def is_higher_priority(self, channel, demod_freq): else: return False - def locked_out(self, channel): + def locked_out(self, channel: int) -> bool: locked = False for lockout_channel in self.lockout_channels: if isinstance(lockout_channel, dict): # is range this range locked out? @@ -377,9 +364,10 @@ def locked_out(self, channel): locked = True return locked - def _generate_gui_lockout_channels(self): + def _generate_gui_lockout_channels(self) -> None: # Create a lockout channel list of strings for the GUI in Mhz self.gui_lockout_channels = [] + gui_lockout_channel: dict[str, float] | float for lockout_channel in self.lockout_channels: if isinstance(lockout_channel, dict): # add the range lockout gui_lockout_channel = {'min': self._baseband_to_frequency(lockout_channel['min']), 'max': self._baseband_to_frequency(lockout_channel['max'])} @@ -388,7 +376,7 @@ def _generate_gui_lockout_channels(self): self.gui_lockout_channels.append(gui_lockout_channel) - def add_lockout(self, idx): + def add_lockout(self, idx: int) -> None: """Adds baseband frequency to lockout channels and updates GUI list Args: @@ -404,12 +392,12 @@ def add_lockout(self, idx): self._generate_gui_lockout_channels() - def _frequency_to_baseband(self, freq): + def _frequency_to_baseband(self, freq: float) -> int: bb_freq = float(freq) * 1E6 - self.center_freq bb_freq = round(bb_freq/self.channel_spacing) * self.channel_spacing return bb_freq - def _baseband_to_frequency(self, bb_freq): + def _baseband_to_frequency(self, bb_freq: int) -> float: return (bb_freq + self.receiver.center_freq)/1E6 def clear_lockout(self): @@ -448,10 +436,7 @@ def update_priority(self): with open(self.priority_file_name) as priority_file: lines = priority_file.read().splitlines() priority_file.close() - if PY3: - lines = builtins.filter(None, lines) - else: - lines = __builtin__.filter(None, lines) + lines = builtins.filter(None, lines) # Convert to baseband frequencies, round, and append if within BW for freq in lines: bb_freq = float(freq) - self.center_freq