diff --git a/apps/cursesgui.py b/apps/cursesgui.py index c567fb5..a6d9b69 100644 --- a/apps/cursesgui.py +++ b/apps/cursesgui.py @@ -20,17 +20,17 @@ class SpectrumWindow(object): screen (object): a curses screen object Attributes: - max_db (float): Top of window in dB - min_db (float): Bottom of window in dB - threshold_db (float): Threshold horizontal line + max_db (int): Top of window in dB + min_db (int): Bottom of window in dB + threshold_db (int): Threshold horizontal line """ def __init__(self, screen): self.screen = screen # Set default values - self.max_db = 50.0 - self.min_db = -20.0 - self.threshold_db = 20.0 + self.max_db = 50 + self.min_db = -20 + self.threshold_db = 20 # Create a window object in top half of the screen, within the border screen_dims = screen.getmaxyx() @@ -137,7 +137,7 @@ def draw_spectrum(self, data): # Update virtual window self.win.noutrefresh() - def proc_keyb(self, keyb): + def proc_keyb(self, keyb: int): """Process keystrokes Args: @@ -305,7 +305,7 @@ def draw_channels(self, gui_lockout_channels, channels: list[Channel]): # Update virtual window self.win.noutrefresh() - def proc_keyb_set_lockout(self, keyb): + def proc_keyb_set_lockout(self, keyb: int): """Process keystrokes to lock out channels 0 - 9 Args: @@ -322,7 +322,7 @@ def proc_keyb_set_lockout(self, keyb): else: return False - def proc_keyb_clear_lockout(self, keyb): + def proc_keyb_clear_lockout(self, keyb: int): """Process keystrokes to clear lockout with "l" Args: @@ -541,7 +541,7 @@ def draw_rx(self): # Update virtual window self.win.noutrefresh() - def proc_keyb_hard(self, keyb): + def proc_keyb_hard(self, keyb: int): """Process keystrokes to adjust hard receiver settings Tune center_freq in 100 MHz steps with 'x' and 'c' @@ -612,7 +612,7 @@ def proc_keyb_hard(self, keyb): else: return False - def proc_keyb_soft(self, keyb): + def proc_keyb_soft(self, keyb: int): """Process keystrokes to adjust soft receiver settings Tune gain_db in 10 dB steps with 'g' and 'f' diff --git a/apps/h2m_parser.py b/apps/h2m_parser.py index 180868a..a7a653b 100644 --- a/apps/h2m_parser.py +++ b/apps/h2m_parser.py @@ -17,7 +17,7 @@ class CLParser(object): hw_args (string): Argument string to pass to harwdare num_demod (int): Number of parallel demodulators center_freq (float): Hardware RF center frequency in Hz - ask_samp_rate (float): Asking sample rate of hardware in sps (1E6 min) + ask_samp_rate (int): Asking sample rate of hardware in sps (1E6 min) gains : Enumerated gain types and values squelch_db (int): Squelch in dB volume_dB (int): Volume in dB @@ -61,7 +61,7 @@ def __init__(self): default=146E6, help="Hardware RF center frequency in Hz") - parser.add_argument("-r", "--rate", type=str, dest="ask_samp_rate", + parser.add_argument("-r", "--rate", type=float, dest="ask_samp_rate", default=4E6, help="Hardware ask sample rate in sps (1E6 minimum)") @@ -103,7 +103,7 @@ def __init__(self): dest="volume_db", default=0, help="Volume in dB") - parser.add_argument("-t", "--threshold", type=float, + parser.add_argument("-t", "--threshold", type=int, dest="threshold_db", default=10, help="Threshold in dB") @@ -183,7 +183,7 @@ def __init__(self): self.num_demod = int(options.num_demod) self.type_demod = int(options.type_demod) self.center_freq = float(options.center_freq) - self.ask_samp_rate = float(options.ask_samp_rate) + self.ask_samp_rate = int(options.ask_samp_rate) self.gains = [ { "name": "RF", "value": float(options.rf_gain_db) }, { "name": "LNA","value": float(options.lna_gain_db) }, @@ -198,7 +198,7 @@ def __init__(self): ] self.squelch_db = int(options.squelch_db) self.volume_db = int(options.volume_db) - self.threshold_db = float(options.threshold_db) + self.threshold_db = int(options.threshold_db) self.record = bool(options.record) self.play = bool(options.play) self.lockout_file_name = str(options.lockout_file_name) diff --git a/apps/lockout-example.yaml b/apps/lockout-example.yaml index f636b60..f9a34c5 100644 --- a/apps/lockout-example.yaml +++ b/apps/lockout-example.yaml @@ -2,13 +2,8 @@ frequencies: - 450.225 # unknown data - 451.125 - 460.4 - - 460.725 - - 467.710 - - 467.715 ranges: - min: 460.4 max: 460.8 - min: 124.125 max: 134.625 - - min: 467.0 # subset of FRS - max: 468.0 \ No newline at end of file diff --git a/apps/receiver.py b/apps/receiver.py index 10517ea..bfb5ae0 100644 --- a/apps/receiver.py +++ b/apps/receiver.py @@ -468,7 +468,8 @@ def __init__(self, ask_samp_rate: int=int(4E6), num_demod: int=4, type_demod: in raise # Default values - self.center_freq = int(144E6) + self.center_freq: int = int(144E6) + self.samp_rate: int self.squelch_db = -60 self.volume_db = 0 audio_rate = 8000 diff --git a/apps/scanner.py b/apps/scanner.py index 5a4a4b2..a91ed15 100644 --- a/apps/scanner.py +++ b/apps/scanner.py @@ -17,7 +17,6 @@ import yaml import logging from numpy.typing import NDArray -import builtins class Scanner(object): """Scanner that controls receiver @@ -25,11 +24,11 @@ class Scanner(object): Estimates channels from FFT power spectrum that are above threshold Rounds channels to nearest 5 kHz Removes channels that are locked out - Tunes demodulators to new channels + Tunes demodulators to new channels # gets overwritten by receiver value Holds demodulators on channels between scan cycles Args: - ask_samp_rate (float): Asking sample rate of hardware in sps (1E6 min) + ask_samp_rate (int): Asking sample rate of hardware in sps (1E6 min) num_demod (int): Number of parallel demodulators type_demod (int): Type of demodulator (0=NBFM, 1=AM) hw_args (string): Argument string to pass to harwdare @@ -43,21 +42,21 @@ class Scanner(object): center_freq (int): initial center frequency for receiver (Hz) spacing (int): granularity of frequency quantization min_recording (float): Minimum length of a recording in seconds - max_recording (int): Maximum length of a recording in seconds + max_recording (float): Maximum length of a recording in seconds Attributes: - center_freq (float): Hardware RF center frequency in Hz + center_freq (int): Hardware RF center frequency in Hz low_bound (int): Freq below which we won't tune a receiver (Hz) high_bound (int): Freq above which we won't tune a receiver (Hz) - samp_rate (float): Hardware sample rate in sps (1E6 min) + samp_rate (int): Hardware sample rate in sps (1E6 min) gains : Enumerated gain types and values squelch_db (int): Squelch in dB volume_dB (int): Volume in dB threshold_dB (int): Threshold for channel detection in dB spectrum (numpy.ndarray): FFT power spectrum data in linear, not dB lockout_channels [float]: List of baseband lockout channels in Hz - priority_channels [float]: List of baseband priority channels in Hz + priority_channels list[int]: List of baseband priority channels in Hz channel_spacing (float): Spacing that channels will be rounded lockout_file_name (string): Name of file with channels to lockout priority_file_name (string): Name of file with channels for priority @@ -68,15 +67,15 @@ class Scanner(object): # pylint: disable=too-many-instance-attributes # pylint: disable=too-many-arguments - def __init__(self, ask_samp_rate=4E6, num_demod=4, type_demod=0, - hw_args="uhd", freq_correction=0, record=True, - lockout_file_name="", priority_file_name="", - channel_log_file_name="", channel_log_timeout=15, - play=True, - audio_bps=8, channel_spacing=5000, - center_freq=0, - min_recording=0, max_recording=0, - classifier_params={'V':False,'D':False,'S':False }): + def __init__(self, ask_samp_rate: int=int(4E6), num_demod: int=4, type_demod: int=0, + hw_args: str="uhd", freq_correction: int=0, record: bool=True, + lockout_file_name: str="", priority_file_name: str="", + channel_log_file_name: str="", channel_log_timeout: int=15, + play: bool=True, + audio_bps: int=8, channel_spacing: int=5000, + center_freq: int=0, + min_recording: float=0, max_recording: float=0, + classifier_params: dict[str, bool]={'V':False,'D':False,'S':False }): # Default values self.squelch_db = -60 @@ -85,12 +84,13 @@ def __init__(self, ask_samp_rate=4E6, num_demod=4, type_demod=0, self.record = record self.play = play self.audio_bps = audio_bps - self.center_freq = center_freq - self.spectrum = [] - self.lockout_channels = [] - self.priority_channels = [] - self._enriched_channels = list[Channel] - self.gui_lockout_channels = [] + self.samp_rate: int + self.center_freq: int = center_freq # gets overwritten by receiver value + self.spectrum: NDArray = np.empty(0) + self.lockout_channels: list[dict[str, int] | int] = [] + self.priority_channels: list[int] = [] + self._enriched_channels: list[Channel] = [] + self.gui_lockout_channels: list[dict[str, float] | float] = [] self.channel_spacing = channel_spacing self.lockout_file_name = lockout_file_name self.priority_file_name = priority_file_name @@ -99,7 +99,7 @@ def __init__(self, ask_samp_rate=4E6, num_demod=4, type_demod=0, self.channel_log_timeout = channel_log_timeout self.log_timeout_last = int(time.time()) self.log_mode = "" - self.hang_time = 1.0 + self.hang_time: float = 1.0 self.max_recording = max_recording # Create receiver object @@ -113,7 +113,7 @@ def __init__(self, ask_samp_rate=4E6, num_demod=4, type_demod=0, # Open channel log file for appending data, if it is specified if channel_log_file_name != "": self.channel_log_file = open(channel_log_file_name, 'a') - if self.channel_log_file != None: + if self.channel_log_file is not None: self.log_mode = "file" else: # Opening log file failed so cannot perform this log mode @@ -131,11 +131,11 @@ def __init__(self, ask_samp_rate=4E6, num_demod=4, type_demod=0, self.receiver.start() time.sleep(1) - if self.channel_log_file != None : + if self.channel_log_file is not None : self.channel_log_file.flush() def __del__(self): - if self.channel_log_file != None : + if self.channel_log_file is not None : self.channel_log_file.close() def _print_channel_log_active(self, freq, state): @@ -162,7 +162,7 @@ def _print_channel_log_active(self, freq, state): def _print_channel_log(self, freq, state, idx): if self.log_mode is not None and self.log_mode != "none": state_str = {True: "on", False: "off"} - if state == False: + if state is False: freq = 0 now = datetime.datetime.now() if self.log_mode == "file" and self.channel_log_file is not None: @@ -201,7 +201,7 @@ def scan_cycle(self) -> None: self._assign_channels_to_demodulators(channels) - self._enriched_channels = self._add_metadata(channels) + self._add_metadata(channels) self._log_recent_active_channels() @@ -288,7 +288,7 @@ def _assign_channels_to_demodulators(self, channels: NDArray) -> None: else: pass - def _add_metadata(self, active_channels: NDArray) -> list[Channel]: + def _add_metadata(self, active_channels: NDArray) -> None: demod_freqs = self.receiver.get_demod_freqs() @@ -312,7 +312,7 @@ def _add_metadata(self, active_channels: NDArray) -> list[Channel]: priority=priority, hanging=channel in demod_freqs and channel not in active_channels)) - return sweep + self._enriched_channels = sweep def _log_recent_active_channels(self) -> None: # TODO: should this functionality be moved into the demodulator? @@ -365,14 +365,14 @@ def locked_out(self, channel: int) -> bool: return locked def _generate_gui_lockout_channels(self) -> None: - # Create a lockout channel list of strings for the GUI in Mhz + # Create a lockout channel list for the GUI in Mhz self.gui_lockout_channels = [] - gui_lockout_channel: dict[str, float] | float + gui_lockout_channel: dict[str, float] | float # list of ranges or single frequencies for lockout_channel in self.lockout_channels: if isinstance(lockout_channel, dict): # add the range lockout gui_lockout_channel = {'min': self._baseband_to_frequency(lockout_channel['min']), 'max': self._baseband_to_frequency(lockout_channel['max'])} else: # add the frequency lockout - gui_lockout_channel = self._baseband_to_frequency(lockout_channel) + gui_lockout_channel = self._baseband_to_frequency(int(lockout_channel)) self.gui_lockout_channels.append(gui_lockout_channel) @@ -385,10 +385,9 @@ def add_lockout(self, idx: int) -> None: # Check to make sure index is within the number of demodulators if idx < len(self.receiver.demodulators): # Lockout if not zero and not already locked out - demod_freq = self.receiver.demodulators[idx].center_freq + demod_freq: int = self.receiver.demodulators[idx].center_freq if (demod_freq != 0) and (demod_freq not in self.lockout_channels): - self.lockout_channels = np.append(self.lockout_channels, - demod_freq) + self.lockout_channels.append(demod_freq) self._generate_gui_lockout_channels() @@ -400,7 +399,7 @@ def _frequency_to_baseband(self, freq: float) -> int: def _baseband_to_frequency(self, bb_freq: int) -> float: return (bb_freq + self.receiver.center_freq)/1E6 - def clear_lockout(self): + def clear_lockout(self) -> None: """Clears lockout channels and updates GUI list """ # Clear the lockout channels @@ -424,7 +423,7 @@ def clear_lockout(self): self._generate_gui_lockout_channels() - def update_priority(self): + def update_priority(self) -> None: """Updates priority channels """ # Clear the priority channels @@ -434,9 +433,7 @@ def update_priority(self): if self.priority_file_name != "": # Open file, split to list, remove empty strings with open(self.priority_file_name) as priority_file: - lines = priority_file.read().splitlines() - priority_file.close() - lines = builtins.filter(None, lines) + lines = list(filter(str.rstrip, priority_file)) # Convert to baseband frequencies, round, and append if within BW for freq in lines: bb_freq = float(freq) - self.center_freq @@ -449,7 +446,7 @@ def update_priority(self): else: pass - def set_center_freq(self, center_freq): + def set_center_freq(self, center_freq: int) -> None: """Sets RF center frequency of hardware and clears lockout channels Sets low and high demod frequency limits based on provided bounds in command line @@ -466,7 +463,7 @@ def set_center_freq(self, center_freq): # Clear the lockout since frequency is changing self.clear_lockout() - def filter_and_set_gains(self, all_gains): + def filter_and_set_gains(self, all_gains: list[dict]) -> list[dict]: """Set the supported gains and return them Args: @@ -475,7 +472,7 @@ def filter_and_set_gains(self, all_gains): self.gains = self.receiver.filter_and_set_gains(all_gains) return self.gains - def set_gains(self, gains): + def set_gains(self, gains: list[dict]) -> list[dict]: """Set all the gains Args: @@ -484,39 +481,39 @@ def set_gains(self, gains): self.gains = self.receiver.set_gains(gains) return self.gains - def set_squelch(self, squelch_db): + def set_squelch(self, squelch_db: int) -> None: """Sets squelch of all demodulators Args: - squelch_db (float): Squelch in dB + squelch_db (int): Squelch in dB """ self.receiver.set_squelch(squelch_db) self.squelch_db = self.receiver.squelch_db - def set_volume(self, volume_db): + def set_volume(self, volume_db: int) -> None: """Sets volume of all demodulators Args: - volume_db (float): Volume in dB + volume_db (int): Volume in dB """ self.receiver.set_volume(volume_db) self.volume_db = self.receiver.volume_db - def set_threshold(self, threshold_db): + def set_threshold(self, threshold_db: int) -> None: """Sets threshold in dB for channel detection Args: - threshold_db (float): Threshold in dB + threshold_db (int): Threshold in dB """ self.threshold_db = threshold_db - def stop(self): + def stop(self) -> None: """Stop the receiver """ self.receiver.stop() self.receiver.wait() - def clean_up(self): + def clean_up(self) -> None: # cleanup terminating all demodulators for demod in self.receiver.demodulators: demod.set_center_freq(0, 0)