Skip to content

Commit

Permalink
Merge pull request #123 from NTIA/SEA-234_ubuntu22.04_python3.10_ray2.10
Browse files Browse the repository at this point in the history
Update Django, Ray, Ubuntu
  • Loading branch information
aromanielloNTIA authored Nov 7, 2024
2 parents 23761e7 + 617d04f commit 177d6a1
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 44 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
default_language_version:
python: python3.8
python: python3.10
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.6.0
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -414,4 +414,5 @@ See [LICENSE](LICENSE.md).

## Contact

For technical questions about SCOS Actions, contact Justin Haze, [[email protected]](mailto:[email protected])
For technical questions about SCOS Actions, contact the
[ITS Spectrum Monitoring Team](mailto:[email protected]).
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ classifiers = [

dependencies = [
"environs>=9.5.0",
"django>=3.2.18,<4.0",
"django>=4.2,<5.0",
"its_preselector @ git+https://github.com/NTIA/[email protected]",
"msgspec>=0.16.0,<1.0.0",
"numexpr>=2.8.3",
"numpy>=1.22.0",
"psutil>=5.9.4",
"python-dateutil>=2.0",
"ray>=2.6.3,<2.8.0",
"ray>=2.10.0",
"ruamel.yaml>=0.15",
"scipy>=1.8.0",
"sigmf @ git+https://github.com/NTIA/SigMF@multi-recording-archive",
Expand All @@ -65,7 +65,7 @@ test = [
dev = [
"hatchling>=1.14.1,<2.0",
"pre-commit>=3.3.1,<4.0",
"ray[default]>=2.4.0",
"ray[default]>=2.10.0",
"scos-actions[test]",
]

Expand Down
2 changes: 1 addition & 1 deletion scos_actions/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "10.0.2"
__version__ = "11.0.0"
76 changes: 41 additions & 35 deletions scos_actions/actions/acquire_sea_data_product.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@
FFT_WINDOW = get_fft_window(FFT_WINDOW_TYPE, FFT_SIZE)
FFT_WINDOW_ECF = get_fft_window_correction(FFT_WINDOW, "energy")
IMPEDANCE_OHMS = 50.0
NUM_ACTORS = 3 # Number of ray actors to initialize
NUM_ACTORS = env.int("RAY_WORKERS", default=3) # Number of ray actors to initialize

# Create power detectors
TD_DETECTOR = create_statistical_detector("TdMeanMaxDetector", ["max", "mean"])
Expand Down Expand Up @@ -417,14 +417,10 @@ def __init__(self, params: dict, iir_sos: np.ndarray):
self.apd_worker = AmplitudeProbabilityDistribution.remote(
params[APD_BIN_SIZE_DB], params[APD_MIN_BIN_DBM], params[APD_MAX_BIN_DBM]
)
self.workers = [
self.fft_worker,
self.pvt_worker,
self.pfp_worker,
self.apd_worker,
]

del params

@ray.method(num_returns=4)
def run(self, iqdata: np.ndarray) -> list:
"""
Filter the input IQ data and concurrently compute FFT, PVT, PFP, and APD results.
Expand All @@ -436,9 +432,11 @@ def run(self, iqdata: np.ndarray) -> list:
# Filter IQ and place it in the object store
iqdata = ray.put(sosfilt(sos=self.iir_sos, x=iqdata))
# Compute PSD, PVT, PFP, and APD concurrently.
# Do not wait until they finish. Yield references to their results.
yield [worker.run.remote(iqdata) for worker in self.workers]
del iqdata
fft_reference = self.fft_worker.run.remote(iqdata)
pvt_reference = self.pvt_worker.run.remote(iqdata)
pfp_reference = self.pfp_worker.run.remote(iqdata)
apd_reference = self.apd_worker.run.remote(iqdata)
return fft_reference, pvt_reference, pfp_reference, apd_reference


class NasctnSeaDataProduct(Action):
Expand Down Expand Up @@ -541,7 +539,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
logger.debug(f"Spawned {NUM_ACTORS} supervisor actors in {toc-tic:.2f} s")

# Collect all IQ data and spawn data product computation processes
dp_procs, cpu_speed, reference_points = [], [], []
data_products_refs, cpu_speed, reference_points = [], [], []
capture_tic = perf_counter()

for i, parameters in enumerate(self.iteration_params):
Expand All @@ -552,10 +550,10 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
)
# Start data product processing but do not block next IQ capture
tic = perf_counter()

dp_procs.append(
data_products_refs.append(
iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"])
)

del measurement_result["data"]
toc = perf_counter()
logger.debug(f"IQ data delivered for processing in {toc-tic:.2f} s")
Expand Down Expand Up @@ -585,35 +583,43 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
[],
)
result_tic = perf_counter()
for channel_data_process in dp_procs:
# Retrieve object references for channel data
channel_data_refs = ray.get(channel_data_process)
channel_count = len(data_products_refs)
logger.debug(f"Have {channel_count} channel results")
for index in range(len(data_products_refs)):
logger.debug(f"Working on channel {index}")
channel_data = []
for i, data_ref in enumerate(channel_data_refs):
# Now block until the data is ready
data = ray.get(data_ref)
if i == 1:
# Power-vs-Time results, a tuple of arrays
data, summaries = data # Split the tuple
max_max_ch_pwrs.append(DATA_TYPE(summaries[0]))
med_mean_ch_pwrs.append(DATA_TYPE(summaries[1]))
mean_ch_pwrs.append(DATA_TYPE(summaries[2]))
median_ch_pwrs.append(DATA_TYPE(summaries[3]))
del summaries
if i == 3: # Separate condition is intentional
# APD result: append instead of extend,
# since the result is a single 1D array
channel_data.append(data)
else:
# For 2D arrays (PSD, PVT, PFP)
channel_data.extend(data)
# Now block until the data is ready
dp_refs_tuple = ray.get(data_products_refs[index])
psd_ref, pvt_ref, pfp_ref, apd_ref = dp_refs_tuple
psd_data = ray.get(psd_ref)
channel_data.extend(psd_data)

pvt_data = ray.get(pvt_ref)
# Power-vs-Time results, a tuple of arrays
data, summaries = pvt_data # Split the tuple
max_max_ch_pwrs.append(DATA_TYPE(summaries[0]))
med_mean_ch_pwrs.append(DATA_TYPE(summaries[1]))
mean_ch_pwrs.append(DATA_TYPE(summaries[2]))
median_ch_pwrs.append(DATA_TYPE(summaries[3]))
channel_data.extend(data)
del summaries

pfp_data = ray.get(pfp_ref)
channel_data.extend(pfp_data)

# APD result: append instead of extend,
# since the result is a single 1D array
apd_data = ray.get(apd_ref)
channel_data.append(apd_data)

toc = perf_counter()
logger.debug(f"Waited {toc-tic} s for channel data")
all_data.extend(NasctnSeaDataProduct.transform_data(channel_data))

for ray_actor in iq_processors:
ray.kill(ray_actor)
result_toc = perf_counter()
del dp_procs, iq_processors, channel_data, channel_data_refs
del iq_processors, channel_data
logger.debug(f"Got all processed data in {result_toc-result_tic:.2f} s")

# Build metadata and convert data to compressed bytes
Expand Down
5 changes: 2 additions & 3 deletions scos_actions/hardware/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ def get_current_cpu_clock_speed() -> float:
:return:
"""
try:
out = subprocess.run("lscpu | grep 'MHz'", shell=True, capture_output=True)
spd = str(out.stdout).split("\\n")[0].split()[2]
return float(spd)
cpu_freq = psutil.cpu_freq()
return cpu_freq.current
except Exception as e:
logger.error("Unable to retrieve current CPU speed")
raise e
Expand Down

0 comments on commit 177d6a1

Please sign in to comment.