From 2667ec69ebe105ceb10e744ff784546b025e08df Mon Sep 17 00:00:00 2001 From: Josh Bailey Date: Wed, 9 Aug 2023 16:29:48 +1200 Subject: [PATCH] detect scanner config changes and restart automatically. --- gamutrf/waterfall.py | 268 ++++++++++++++++++++++++---------------- tests/test_waterfall.py | 3 +- 2 files changed, 161 insertions(+), 110 deletions(-) diff --git a/gamutrf/waterfall.py b/gamutrf/waterfall.py index e63a0760..c4e0232c 100644 --- a/gamutrf/waterfall.py +++ b/gamutrf/waterfall.py @@ -399,20 +399,6 @@ def meshgrid(config, start, stop, num): ) -def init_state( - config, - state, -): - state.X, state.Y = meshgrid( - config, 1, config.waterfall_height, config.waterfall_height - ) - state.freq_bins = state.X[0] - state.db_data = np.empty(state.X.shape) - state.db_data.fill(np.nan) - state.freq_data = np.empty(state.X.shape) - state.freq_data.fill(np.nan) - - def init_fig( config, state, @@ -539,39 +525,14 @@ def draw_peaks( state.ax_psd.draw_artist(txt) -def update_fig(config, state, zmqr, rotate_secs, save_time, scan_configs, scan_df): +def update_fig(config, state, results): if not state.fig or not state.ax: raise NotImplementedError - if scan_df is not None: - results = [ - (scan_configs, frame_resample(scan_df, config.freq_resolution * 1e6)) - ] - else: - results = [] - while True: - scan_configs, scan_df = zmqr.read_buff( - scan_fres=config.freq_resolution * 1e6 - ) - if scan_df is None: - break - scan_df = scan_df[ - (scan_df.freq >= config.min_freq) & (scan_df.freq <= config.max_freq) - ][: config.waterfall_width] - if scan_df.empty: - logging.info( - f"Scan is outside specified frequency range ({config.min_freq} to {config.max_freq})." - ) - continue - results.append((scan_configs, scan_df)) - - if not results: - return False - - if config.base_save_path and rotate_secs: + if config.base_save_path and config.rotate_secs: state.save_path = os.path.join( config.base_save_path, - str(int(time.time() / rotate_secs) * rotate_secs), + str(int(time.time() / config.rotate_secs) * config.rotate_secs), ) if not os.path.exists(state.save_path): Path(state.save_path).mkdir(parents=True, exist_ok=True) @@ -583,7 +544,7 @@ def update_fig(config, state, zmqr, rotate_secs, save_time, scan_configs, scan_d idx = ( ((scan_df.freq - config.min_freq) / config.freq_resolution) .round() - .clip(lower=0, upper=(scan_df.shape[0] - 1)) + .clip(lower=0, upper=(state.db_data.shape[1] - 1)) .values.flatten() .astype(int) ) @@ -716,13 +677,11 @@ def update_fig(config, state, zmqr, rotate_secs, save_time, scan_configs, scan_d if state.save_path: save_waterfall( state, - save_time, + config.save_time, scan_time, fig_path=fig_path, ) - return True - class WaterfallConfig: def __init__( @@ -741,6 +700,8 @@ def __init__( waterfall_height, waterfall_width, batch, + rotate_secs, + save_time, ): self.engine = engine self.plot_snr = plot_snr @@ -770,15 +731,20 @@ def __init__( ) self.freq_resolution = self.freq_range / self.waterfall_width self.n_ticks = 20 + self.rotate_secs = rotate_secs + self.save_time = save_time + + def __eq__(self, other): + for attr in ("fft_len", "sampling_rate", "min_freq", "max_freq"): + if getattr(self, attr) != getattr(other, attr): + return False + return True class WaterfallState: - def __init__(self): + def __init__(self, config, save_path, peak_finder): self.db_min = -220 self.db_max = -150 - self.db_data = None - self.freq_data = None - self.freq_bins = None self.detection_text = [] self.scan_times = [] self.scan_config_history = {} @@ -791,8 +757,6 @@ def __init__(self): self.major_tick_separator = None self.cmap_psd = None self.cmap = None - self.X = None - self.Y = None self.fig = None self.top_n_lns = None self.background = None @@ -808,10 +772,63 @@ def __init__(self): self.current_psd_ln = None self.ax_psd = None self.ax = None - self.save_path = None + self.save_path = save_path self.mesh_psd = None - self.peak_finder = None + self.peak_finder = peak_finder self.last_plot = 0 + self.X, self.Y = meshgrid( + config, 1, config.waterfall_height, config.waterfall_height + ) + self.freq_bins = self.X[0] + self.db_data = np.empty(self.X.shape) + self.db_data.fill(np.nan) + self.freq_data = np.empty(self.X.shape) + self.freq_data.fill(np.nan) + + +def make_config( + scan_configs, + min_freq, + max_freq, + engine, + plot_snr, + savefig_path, + top_n, + base_save_path, + width, + height, + waterfall_height, + waterfall_width, + batch, + rotate_secs, + save_time, +): + sampling_rate = max([scan_config["sample_rate"] for scan_config in scan_configs]) + fft_len = max([scan_config["nfft"] for scan_config in scan_configs]) + if min_freq == 0: + min_freq = min([scan_config["freq_start"] for scan_config in scan_configs]) + if max_freq == 0: + max_freq = max([scan_config["freq_end"] for scan_config in scan_configs]) + + config = WaterfallConfig( + engine, + plot_snr, + savefig_path, + sampling_rate, + fft_len, + min_freq, + max_freq, + top_n, + base_save_path, + width, + height, + waterfall_height, + waterfall_width, + batch, + rotate_secs, + save_time, + ) + return config def waterfall( @@ -832,12 +849,16 @@ def waterfall( batch, zmqr, ): + style.use("fast") + global need_reset_fig need_reset_fig = True global running running = True + need_init = True + need_reconfig = True - def onresize(_event): + def onresize(_event): # nosemgrep global need_reset_fig need_reset_fig = True @@ -849,11 +870,10 @@ def sig_handler(_sig=None, _frame=None): signal.signal(signal.SIGTERM, sig_handler) logging.info("awaiting scanner startup") - while not zmqr.healthy(): time.sleep(0.1) - logging.info("awaiting config from scanner") + logging.info("awaiting initial config from scanner(s)") scan_configs = None scan_df = None while zmqr.healthy() and running: @@ -865,63 +885,95 @@ def sig_handler(_sig=None, _frame=None): if not scan_configs: return - sampling_rate = max([scan_config["sample_rate"] for scan_config in scan_configs]) - fft_len = max([scan_config["nfft"] for scan_config in scan_configs]) - if min_freq == 0: - min_freq = min([scan_config["freq_start"] for scan_config in scan_configs]) - if max_freq == 0: - max_freq = max([scan_config["freq_end"] for scan_config in scan_configs]) - - config = WaterfallConfig( - engine, - plot_snr, - savefig_path, - sampling_rate, - fft_len, - min_freq, - max_freq, - top_n, - base_save_path, - width, - height, - waterfall_height, - waterfall_width, - batch, - ) - - logging.info( - "scanning %fMHz to %fMHz at %fMsps with %u FFT points at %fMHz resolution", - config.min_freq, - config.max_freq, - config.sampling_rate / 1e6, - config.fft_len, - config.freq_resolution, - ) - - state = WaterfallState() - state.save_path = config.base_save_path - init_state(config, state) - state.peak_finder = peak_finder + while zmqr.healthy() and running: + if need_reconfig: + config = make_config( + scan_configs, + min_freq, + max_freq, + engine, + plot_snr, + savefig_path, + top_n, + base_save_path, + width, + height, + waterfall_height, + waterfall_width, + batch, + rotate_secs, + save_time, + ) + logging.info( + "scanning %fMHz to %fMHz at %fMsps with %u FFT points at %fMHz resolution", + config.min_freq, + config.max_freq, + config.sampling_rate / 1e6, + config.fft_len, + config.freq_resolution, + ) - matplotlib.use(config.engine) - style.use("fast") - init_fig(config, state, onresize) + state = WaterfallState(config, base_save_path, peak_finder) + matplotlib.use(config.engine) + results = [ + (scan_configs, frame_resample(scan_df, config.freq_resolution * 1e6)) + ] - while zmqr.healthy() and running: + need_reconfig = False + need_init = True + if need_init: + init_fig(config, state, onresize) + need_init = False + need_reset_fig = True if need_reset_fig: reset_fig(config, state) need_reset_fig = False - if not update_fig( - config, state, zmqr, rotate_secs, save_time, scan_configs, scan_df - ): + while True: + scan_configs, scan_df = zmqr.read_buff( + scan_fres=config.freq_resolution * 1e6 + ) + if scan_df is None: + break + last_config = make_config( + scan_configs, + min_freq, + max_freq, + engine, + plot_snr, + savefig_path, + top_n, + base_save_path, + width, + height, + waterfall_height, + waterfall_width, + batch, + rotate_secs, + save_time, + ) + if last_config != config: + logging.info("scanner config change detected") + results = [] + need_reconfig = True + break + scan_df = scan_df[ + (scan_df.freq >= config.min_freq) & (scan_df.freq <= config.max_freq) + ] + if scan_df.empty: + logging.info( + f"Scan is outside specified frequency range ({config.min_freq} to {config.max_freq})." + ) + continue + results.append((scan_configs, scan_df)) + if need_reconfig: + continue + if results: + update_fig(config, state, results) + if config.batch and state.counter % config.reclose_interval == 0: + need_init = True + results = [] + else: time.sleep(0.1) - # TODO: workaround memory leak in savefig with periodic reinitialiation - elif config.batch and state.counter % config.reclose_interval == 0: - init_fig(config, state, onresize) - need_reset_fig = True - scan_configs = None - scan_df = None - zmqr.stop() diff --git a/tests/test_waterfall.py b/tests/test_waterfall.py index 2b199c6c..2f017b62 100755 --- a/tests/test_waterfall.py +++ b/tests/test_waterfall.py @@ -10,7 +10,6 @@ import pandas as pd from gamutrf.waterfall import argument_parser, waterfall from gamutrf.peak_finder import get_peak_finder -from gamutrf.zmqreceiver import frame_resample class FakeZmqReceiver: @@ -43,7 +42,7 @@ def read_buff(self, scan_fres): df.loc[ (df.freq >= self.peak_min) & (df.freq <= self.peak_max), "db" ] = self.peak_val - df = frame_resample(df, scan_fres) + df["freq"] /= 1e6 self.serve_results = [ ( [