Skip to content

Commit

Permalink
Merge pull request #117 from NTIA/revert-110-SEA-178_test_actions_loa…
Browse files Browse the repository at this point in the history
…ding

Revert "Sea 178 test actions loading"
  • Loading branch information
dboulware authored Apr 26, 2024
2 parents ed7679f + 80c9982 commit 2c72866
Show file tree
Hide file tree
Showing 21 changed files with 112 additions and 106 deletions.
2 changes: 1 addition & 1 deletion scos_actions/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "10.0.0"
__version__ = "9.0.0"
2 changes: 2 additions & 0 deletions scos_actions/actions/calibrate_y_factor.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,14 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
logger.debug(f"cal_params: {cal_params}")
cal_data = dict()
last_cal_datetime = get_datetime_str_now()
clock_rate_lookup_by_sample_rate = []
self.sensor.sensor_calibration = SensorCalibration(
calibration_parameters=cal_params,
calibration_data=cal_data,
calibration_reference=onboard_cal_reference,
file_path=Path(env("ONBOARD_CALIBRATION_FILE")),
last_calibration_datetime=last_cal_datetime,
clock_rate_lookup_by_sample_rate=clock_rate_lookup_by_sample_rate,
sensor_uid=sensor_uid,
)
elif self.sensor.sensor_calibration.file_path == env(
Expand Down
19 changes: 9 additions & 10 deletions scos_actions/actions/interfaces/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,15 @@ def configure_sigan(self, params: dict):

def configure_preselector(self, params: dict):
preselector = self.sensor.preselector
if self.sensor.has_configurable_preselector:
if self.PRESELECTOR_PATH_KEY in params:
path = params[self.PRESELECTOR_PATH_KEY]
logger.debug(f"Setting preselector RF path: {path}")
preselector.set_state(path)
else:
# Require the RF path to be specified if the sensor has a preselector.
raise ParameterException(
f"No {self.PRESELECTOR_PATH_KEY} value specified in the YAML config."
)
if self.PRESELECTOR_PATH_KEY in params:
path = params[self.PRESELECTOR_PATH_KEY]
logger.debug(f"Setting preselector RF path: {path}")
preselector.set_state(path)
elif self.sensor.has_configurable_preselector:
# Require the RF path to be specified if the sensor has a preselector.
raise ParameterException(
f"No {self.PRESELECTOR_PATH_KEY} value specified in the YAML config."
)
else:
# No preselector in use, so do not require an RF path
pass
Expand Down
7 changes: 3 additions & 4 deletions scos_actions/actions/interfaces/measurement_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,15 @@ def get_calibration(self, measurement_result: dict) -> ntia_sensor.Calibration:
noise_figure=round(
measurement_result["applied_calibration"]["noise_figure"], 3
),
temperature=round(
self.sensor.sensor_calibration_data["temperature"], 1
),
reference=measurement_result["reference"],
)
if "compression_point" in measurement_result["applied_calibration"]:
cal_meta.compression_point = measurement_result["applied_calibration"][
"compression_point"
]
if "temperature" in self.sensor.sensor_calibration_data:
cal_meta.temperature = round(
self.sensor.sensor_calibration_data["temperature"], 1
)
return cal_meta

def create_metadata(
Expand Down
34 changes: 34 additions & 0 deletions scos_actions/actions/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""A simple example action that logs a message."""

import logging
from typing import Optional

from scos_actions.actions.interfaces.action import Action
from scos_actions.hardware.sensor import Sensor

logger = logging.getLogger(__name__)

LOGLVL_INFO = 20
LOGLVL_ERROR = 40


class Logger(Action):
"""Log the message "running test {name}/{tid}".
This is useful for testing and debugging.
`{name}` will be replaced with the parent schedule entry's name, and
`{tid}` will be replaced with the sequential task id.
"""

def __init__(self, loglvl=LOGLVL_INFO):
super().__init__(parameters={"name": "logger"})
self.loglvl = loglvl

def __call__(self, sensor: Optional[Sensor], schedule_entry: dict, task_id: int):
msg = "running test {name}/{tid}"
schedule_entry_name = schedule_entry["name"]
logger.log(
level=self.loglvl, msg=msg.format(name=schedule_entry_name, tid=task_id)
)
4 changes: 2 additions & 2 deletions scos_actions/actions/sync_gps.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ def __init__(self, parameters: dict):
def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
logger.debug("Syncing to GPS")
self.sensor = sensor
dt = self.sensor.gps.get_gps_time(self.sensor)
dt = self.sensor.gps.get_gps_time()
date_cmd = ["date", "-s", "{:}".format(dt.strftime("%Y/%m/%d %H:%M:%S"))]
subprocess.check_output(date_cmd, shell=True)
logger.info(f"Set system time to GPS time {dt.ctime()}")

location = sensor.gps.get_location(self.sensor)
location = sensor.gps.get_location()
if location is None:
raise RuntimeError("Unable to synchronize to GPS")

Expand Down
8 changes: 8 additions & 0 deletions scos_actions/calibration/sensor_calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,16 @@ class provides an implementation for the update method to allow calibration
"""

last_calibration_datetime: str
clock_rate_lookup_by_sample_rate: List[Dict[str, float]]
sensor_uid: str

def get_clock_rate(self, sample_rate: Union[float, int]) -> Union[float, int]:
"""Find the clock rate (Hz) using the given sample_rate (samples per second)"""
for mapping in self.clock_rate_lookup_by_sample_rate:
if mapping["sample_rate"] == sample_rate:
return mapping["clock_frequency"]
return sample_rate

def update(
self,
params: dict,
Expand Down
1 change: 1 addition & 0 deletions scos_actions/calibration/tests/test_calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def test_to_and_from_json(self, tmp_path: Path):
"testing",
tmp_path / "testing.json",
"dt_str",
[],
"uid",
)
sensor_cal.to_json()
Expand Down
22 changes: 22 additions & 0 deletions scos_actions/calibration/tests/test_sensor_calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ def setup_calibration_file(self, tmp_path: Path):
cal_data["sensor_uid"] = "SAMPLE_CALIBRATION"
cal_data["calibration_reference"] = "TESTING"

# Add SR/CF lookup table
cal_data["clock_rate_lookup_by_sample_rate"] = []
for sr in self.sample_rates:
cr = sr
while cr <= 40e6:
cr *= 2
cr /= 2
cal_data["clock_rate_lookup_by_sample_rate"].append(
{"sample_rate": int(sr), "clock_frequency": int(cr)}
)

# Create the JSON architecture for the calibration data
cal_data["calibration_data"] = {}
cal_data["calibration_parameters"] = ["sample_rate", "frequency", "gain"]
Expand Down Expand Up @@ -140,6 +151,7 @@ def test_sensor_calibration_dataclass_fields(self):
# Note: does not check field order
assert fields == {
"last_calibration_datetime": str,
"clock_rate_lookup_by_sample_rate": List[Dict[str, float]],
"sensor_uid": str,
}

Expand All @@ -155,6 +167,13 @@ def test_field_validator(self):
[], {}, False, Path(""), datetime.datetime.now(), [], "uid"
)

def test_get_clock_rate(self):
"""Test the get_clock_rate method"""
# Test getting a clock rate by sample rate
assert self.sample_cal.get_clock_rate(10e6) == 40e6
# If there isn't an entry, the sample rate should be returned
assert self.sample_cal.get_clock_rate(-999) == -999

def test_get_calibration_dict_exact_match_lookup(self):
calibration_datetime = get_datetime_str_now()
calibration_params = ["sample_rate", "frequency"]
Expand All @@ -168,6 +187,7 @@ def test_get_calibration_dict_exact_match_lookup(self):
calibration_reference="testing",
file_path=Path(""),
last_calibration_datetime=calibration_datetime,
clock_rate_lookup_by_sample_rate=[],
sensor_uid="TESTING",
)
cal_data = cal.get_calibration_dict({"sample_rate": 100.0, "frequency": 200.0})
Expand All @@ -186,6 +206,7 @@ def test_get_calibration_dict_within_range(self):
calibration_reference="testing",
file_path=Path("test_calibration.json"),
last_calibration_datetime=calibration_datetime,
clock_rate_lookup_by_sample_rate=[],
sensor_uid="TESTING",
)

Expand Down Expand Up @@ -213,6 +234,7 @@ def test_update(self):
calibration_reference="testing",
file_path=test_cal_path,
last_calibration_datetime=calibration_datetime,
clock_rate_lookup_by_sample_rate=[],
sensor_uid="TESTING",
)
action_params = {"sample_rate": 100.0, "frequency": 200.0}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,3 @@ stepped_frequency_time_domain_iq:
nskip: 15.36e4
calibration_adjust: False
classification: UNCLASSIFIED
rf_path: antenna
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
nasctn_sea_data_product:
name: test_SEA_CBRS_Measure_Baseline
name: test_nasctn_sea_data_product
rf_path: antenna
calibration_adjust: False
# IIR filter settings
iir_apply: True
iir_gpass_dB: 0.1 # Max passband ripple below unity gain
iir_gstop_dB: 40 # Minimum stopband attenuation
iir_pb_edge_Hz: 5e6 # Passband edge frequency
iir_sb_edge_Hz: 5.008e6 # Stopband edge frequency
# FFT settings
# Mean/Max FFT settings
fft_size: 175
nffts: 320e3
fft_window_type: flattop # See scipy.signal.get_window for supported input
# PFP frame
pfp_frame_period_ms: 10
# APD downsampling settings
apd_bin_size_dB: 1.0 # Set to 0 or negative for no downsampling
apd_max_bin_dBm: -30
apd_bin_size_dB: 0.5 # Set to 0 or negative for no downsampling
apd_min_bin_dBm: -180
apd_max_bin_dBm: -30
# Time domain power statistics settings
td_bin_size_ms: 10
# Round all power results to X decimal places
round_to_places: 2
# Sigan Settings
preamp_enable: True
reference_level: -25
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,3 @@ single_frequency_time_domain_iq:
nskip: 15.36e4
calibration_adjust: False
classification: UNCLASSIFIED
rf_path: antenna
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,3 @@ single_frequency_fft:
nskip: 15.36e4
calibration_adjust: False
classification: UNCLASSIFIED
rf_path: antenna
1 change: 0 additions & 1 deletion scos_actions/configs/actions/test_survey_iq_action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,3 @@ stepped_frequency_time_domain_iq:
- 10000
nskip: 15.36e4
calibration_adjust: False
rf_path: antenna
33 changes: 15 additions & 18 deletions scos_actions/discover/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
from scos_actions.actions import action_classes
from scos_actions.actions.logger import Logger
from scos_actions.actions.monitor_sigan import MonitorSignalAnalyzer
from scos_actions.actions.sync_gps import SyncGps
from scos_actions.discover.yaml import load_from_yaml
from scos_actions.settings import ACTION_DEFINITIONS_DIR, SIGAN_CLASS, SIGAN_MODULE
from scos_actions.settings import ACTION_DEFINITIONS_DIR

actions = {}
test_actions = {}
actions = {
"logger": Logger(),
}
test_actions = {
"test_sync_gps": SyncGps(parameters={"name": "test_sync_gps"}),
"test_monitor_sigan": MonitorSignalAnalyzer(
parameters={"name": "test_monitor_sigan"}
),
"logger": Logger(),
}


def init(
Expand All @@ -22,18 +31,6 @@ def init(
return yaml_actions, yaml_test_actions


if (
SIGAN_MODULE == "scos_actions.hardware.mocks.mock_sigan"
and SIGAN_CLASS == "MockSignalAnalyzer"
):
yaml_actions, yaml_test_actions = init()
actions.update(yaml_actions)
test_actions.update(
{
"test_sync_gps": SyncGps(parameters={"name": "test_sync_gps"}),
"test_monitor_sigan": MonitorSignalAnalyzer(
parameters={"name": "test_monitor_sigan"}
),
}
)
test_actions.update(yaml_test_actions)
yaml_actions, yaml_test_actions = init()
actions.update(yaml_actions)
test_actions.update(yaml_test_actions)
6 changes: 2 additions & 4 deletions scos_actions/hardware/gps_iface.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@

class GPSInterface(ABC):
@abstractmethod
def get_location(
self, sensor: "scos_actions.hardware.sensor.Sensor", timeout_s: float = 1
):
def get_location(self, timeout_s=1):
pass

@abstractmethod
def get_gps_time(self, sensor: "scos_actions.hardware.sensor.Sensor"):
def get_gps_time(self):
pass
5 changes: 2 additions & 3 deletions scos_actions/hardware/mocks/mock_gps.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@


class MockGPS(GPSInterface):

def get_location(self, sensor, timeout_s=1):
def get_location(timeout_s=1):
logger.warning("Using mock GPS!")
return 39.995118, -105.261572, 1651.0

def get_gps_time(self, sensor):
def get_gps_time(self):
logger.warning("Using mock GPS!")
return datetime.now()
6 changes: 0 additions & 6 deletions scos_actions/hardware/mocks/mock_sigan.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import numpy as np

from scos_actions import __package__ as SCOS_ACTIONS_NAME
from scos_actions import __version__ as SCOS_ACTIONS_VERSION
from scos_actions.hardware.sigan_iface import SignalAnalyzerInterface
from scos_actions.utils import get_datetime_str_now
Expand Down Expand Up @@ -43,7 +42,6 @@ def __init__(
self._reference_level = -30
self._is_available = True
self._plugin_version = SCOS_ACTIONS_VERSION
self._plugin_name = SCOS_ACTIONS_NAME
self._firmware_version = "1.2.3"
self._api_version = "v1.2.3"

Expand All @@ -62,10 +60,6 @@ def is_available(self):
def plugin_version(self):
return self._plugin_version

@property
def plugin_name(self):
return self._plugin_name

@property
def sample_rate(self):
return self._sample_rate
Expand Down
Loading

0 comments on commit 2c72866

Please sign in to comment.