Skip to content

Commit

Permalink
tests: validate last sample hold
Browse files Browse the repository at this point in the history
- The test was introduced to validate the new feature
of the aout interface.

Signed-off-by: Adrian Stanea <[email protected]>
  • Loading branch information
Adrian-Stanea committed Aug 20, 2024
1 parent 8c75231 commit 8b42898
Show file tree
Hide file tree
Showing 5 changed files with 305 additions and 16 deletions.
214 changes: 212 additions & 2 deletions tests/analog_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import random
import sys
import reset_def_values as reset
from helpers import get_result_files, save_data_to_csv, plot_to_file
from helpers import get_result_files, get_sample_rate_display_format, get_time_format, save_data_to_csv, plot_to_file
from open_context import ctx_timeout, ctx
from create_files import results_file, results_dir, csv

from shapefile import shape_gen, Shape

# dicts that will be saved to csv files
shape_csv_vals = {}
ampl_csv_vals = {}
Expand Down Expand Up @@ -1265,4 +1267,212 @@ def test_buffer_transition_glitch(channel, ain, aout, trig, waveform, amplitude=
f'buffer_glitch_plot_ch{channel}_{waveform}.png',
data_marked=filtered_peaks)

return num_peaks
return num_peaks


def get_experiment_config_for_sample_hold(dac_sr):
cfg = {}
if dac_sr == 75_000_000:
cfg["dac_sr"] = dac_sr
cfg["adc_sr"] = 100_000_000
cfg["buffer_size"] = 20_000
cfg["trig_threshold"] = 2.9
cfg["amplitude"] = 5
cfg["samples_per_period"] = 1024 * 8
cfg["offset"] = 0
elif dac_sr == 7_500_000:
cfg["dac_sr"] = dac_sr
cfg["adc_sr"] = 100_000_000
cfg["buffer_size"] = 30_000
cfg["trig_threshold"] = 2.9
cfg["amplitude"] = 5
cfg["samples_per_period"] = 1024
cfg["offset"] = 0
elif dac_sr == 750_000:
cfg["dac_sr"] = dac_sr
cfg["adc_sr"] = 10_000_000
cfg["buffer_size"] = 30_000
cfg["trig_threshold"] = 2.9
cfg["amplitude"] = 5
cfg["samples_per_period"] = 1024
cfg["offset"] = 0
elif dac_sr == 75_000:
cfg["dac_sr"] = dac_sr
cfg["adc_sr"] = 1_000_000
cfg["buffer_size"] = 30_000
cfg["trig_threshold"] = 2.9
cfg["amplitude"] = 5
cfg["samples_per_period"] = 1024
cfg["offset"] = 0
elif dac_sr == 7_500:
cfg["dac_sr"] = dac_sr
cfg["adc_sr"] = 1_000_000
cfg["buffer_size"] = 30_000
cfg["trig_threshold"] = 2.9
cfg["amplitude"] = 5
cfg["samples_per_period"] = 128
cfg["offset"] = 0
# 750 Hz ommited to avoid long test duration
else:
raise ValueError("Invalid DAC sample rate.")
return cfg


def test_last_sample_hold(channel, ain : libm2k.M2kAnalogIn, aout: libm2k.M2kAnalogOut, trig: libm2k.M2kHardwareTrigger, ctx:libm2k.M2k, cfg):
def step_ramp_rising(aout_chn, trig_chn, buffer_ramp_up):
set_trig(trig, trig_chn, 8192, libm2k.RISING_EDGE_ANALOG, -cfg.get("trig_threshold"))
ain.startAcquisition(cfg.get("buffer_size"))
if aout_chn is None:
aout.push([buffer_ramp_up, buffer_ramp_up])
else:
aout.push(aout_chn, buffer_ramp_up)
data = np.array(ain.getSamples(cfg.get("buffer_size")))
# Flush values from previous buffer
ain.stopAcquisition()
return data

def step_ramp_falling(aout_chn, trig_chn, buffer_ramp_down):
set_trig(trig, trig_chn, 8192, libm2k.FALLING_EDGE_ANALOG, cfg.get("trig_threshold"))
ain.startAcquisition(cfg.get("buffer_size"))
if aout_chn is None:
aout.push([buffer_ramp_down, buffer_ramp_down])
else:
aout.push(aout_chn, buffer_ramp_down)
data = np.array(ain.getSamples(cfg.get("buffer_size")))
# Flush values from previous buffer
ain.stopAcquisition()
return data

def check_for_glitch(data, threshold=0.3):
"""
* The glitch is unwanted and happened in between the last sample of the previous buffer and the first sample of the new buffer.
* NOTE: At DAC_SR <= 7.5 KHz we see oscilations due to the response of the HDL filter
"""
glitch_found = False
for chn_samples in data:
if any(abs(left - right) >= threshold for left, right in zip(chn_samples, chn_samples[1:])):
glitch_found = True
return glitch_found

def are_values_within_range(chn, data: np.ndarray, lower_bound, upper_bound):
# TODO: When only 1 channel is used verify that the other channels stays at 0V. Currently after a stop and start it goes back to last sample. We expect 0V since stop destroys the buffer.
assert lower_bound < upper_bound, "Invalid bounds"

is_CH0_in_range = np.all((lower_bound <= data[0]) & (data[0] <= upper_bound))
is_CH1_in_range = np.all((lower_bound <= data[1]) & (data[1] <= upper_bound))
if channel is None:
return is_CH0_in_range and is_CH1_in_range
elif channel == libm2k.ANALOG_IN_CHANNEL_1:
return is_CH0_in_range
elif channel == libm2k.ANALOG_IN_CHANNEL_2:
return is_CH1_in_range
else:
raise ValueError(f"Unknown channel: {channel}")

file_name, dir_name, csv_path = get_result_files(gen_reports)
test_name = "sample_hold"
data_string = []

chn_str = "both_channels" if channel is None else f"CH{channel}"
sr_str = get_sample_rate_display_format(cfg.get("dac_sr"))
x_time, x_label = get_time_format(cfg.get("buffer_size"), cfg.get("adc_sr"))

if gen_reports:
subdir_name = f"{dir_name}/last_sample_hold/{chn_str}"
os.makedirs(subdir_name, exist_ok=True)

SLEEP = 0.15
glitched = False
is_last_sample_hold_ok = True # Assume it is ok until proven otherwise
assert channel in [libm2k.ANALOG_IN_CHANNEL_1, libm2k.ANALOG_IN_CHANNEL_2, None], "Invalid channel ... None means use both channels"
trig_chn = libm2k.ANALOG_IN_CHANNEL_1 if channel is None else channel

buffer_ramp_up = shape_gen(n=cfg["samples_per_period"],
amplitude=cfg["amplitude"],
offset=cfg["offset"])[Shape.RISING_RAMP.value]
buffer_ramp_down = shape_gen(n=cfg["samples_per_period"],
amplitude=cfg["amplitude"],
offset=cfg["offset"])[Shape.FALLING_RAMP.value]

ain.enableChannel(libm2k.ANALOG_IN_CHANNEL_1, True)
ain.enableChannel(libm2k.ANALOG_IN_CHANNEL_2, True)
ain.setSampleRate(cfg.get("adc_sr"))
ain.setRange(0, libm2k.PLUS_MINUS_25V)
ain.setRange(1, libm2k.PLUS_MINUS_25V)

aout.setSampleRate(0, cfg.get("dac_sr"))
aout.setSampleRate(1, cfg.get("dac_sr"))
aout.setKernelBuffersCount(0, 4)
aout.setKernelBuffersCount(1, 4)
aout.enableChannel(0, True)
aout.enableChannel(1, True)
aout.setCyclic(False)

# Alternate between rising and falling ramps: rising, falling, rising, falling
# 1: Rising
data = step_ramp_rising(channel, trig_chn, buffer_ramp_up)
print(data.shape)
# Shoud hold last sample from new buffer
is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(channel, data[:, -2000:], cfg["amplitude"] * 0.90, cfg["amplitude"] * 1.10)
if gen_reports:
plot_to_file(title=f"Last Sample Hold: {chn_str} - {sr_str} - Rising Ramp",
data=data[0],
data1=data[1],
x_data=x_time,
xlabel = x_label,
dir_name=subdir_name,
y_lim=(-6, 6),
filename=f"last_sample_hold_{chn_str}_{sr_str}_step1.png")
time.sleep(SLEEP) # wait for the DAC output to settle with last sample
# 2: Falling
data = step_ramp_falling(channel, trig_chn, buffer_ramp_down)
# Shoud start with last sample from previous buffer
is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(channel, data[:, :2000], cfg["amplitude"] * 0.90, cfg["amplitude"] * 1.10)
# Shoud hold last sample from new buffer
is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(channel, data[:, -2000:], -cfg["amplitude"] * 1.10, -cfg["amplitude"] * 0.90)
glitched = glitched or check_for_glitch(data)
if gen_reports:
plot_to_file(title=f"Last Sample Hold: {chn_str} - {sr_str} - Falling Ramp",
data=data[0],
data1=data[1],
x_data=x_time,
xlabel = x_label,
dir_name=subdir_name,
y_lim=(-6, 6),
filename=f"last_sample_hold_{chn_str}_{sr_str}_step2.png")
time.sleep(SLEEP) # wait for the DAC output to settle with last sample
# 3: Rising
data = step_ramp_rising(channel, trig_chn, buffer_ramp_up)
# Shoud start with last sample from previous buffer
is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(channel, data[:, :2000], -cfg["amplitude"] * 1.10, -cfg["amplitude"] * 0.90)
# Shoud hold last sample from new buffer
is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(channel, data[:, -2000:], cfg["amplitude"] * 0.90, cfg["amplitude"] * 1.10)
glitched = glitched or check_for_glitch(data)
if gen_reports:
plot_to_file(title=f"Last Sample Hold: {chn_str} - {sr_str} - Rising Ramp",
data=data[0],
data1=data[1],
x_data=x_time,
xlabel = x_label,
dir_name=subdir_name,
y_lim=(-6, 6),
filename=f"last_sample_hold_{chn_str}_{sr_str}_step3.png")
time.sleep(SLEEP) # wait for the DAC output to settle with last sample
# 4: Falling
data = step_ramp_falling(channel, trig_chn, buffer_ramp_down)
# Shoud start with last sample from previous buffer
is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(channel, data[:, :2000], cfg["amplitude"] * 0.90, cfg["amplitude"] * 1.10)
# Shoud hold last sample from new buffer
is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(channel, data[:, -2000:], -cfg["amplitude"] * 1.10, -cfg["amplitude"] * 0.90)
glitched = glitched or check_for_glitch(data)
if gen_reports:
plot_to_file(title=f"Last Sample Hold: {chn_str} - {sr_str} - Falling Ramp",
data=data[0],
data1=data[1],
x_data=x_time,
xlabel = x_label,
dir_name=subdir_name,
y_lim=(-6, 6),
filename=f"last_sample_hold_{chn_str}_{sr_str}_step4.png")
aout.stop()
return glitched, is_last_sample_hold_ok
52 changes: 47 additions & 5 deletions tests/helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

import numpy as np
from pandas import DataFrame
import matplotlib.pyplot as plt

Expand Down Expand Up @@ -27,7 +28,7 @@ def save_data_to_csv(csv_vals, csv_file):
return


def plot_to_file(title, data, dir_name, filename, xlabel=None, ylabel=None, data1=None, data_marked=None):
def plot_to_file(title, data, dir_name, filename, xlabel=None, x_lim = None, ylabel=None, y_lim = None, data1=None, x_data = None, data_marked=None):
# Saves the plots in a separate folder
# Arguments:
# title -- Title of the plot
Expand All @@ -40,7 +41,8 @@ def plot_to_file(title, data, dir_name, filename, xlabel=None, ylabel=None, data
# data_marked -- Data that represents specific points on the plot(default: {None})
# plot the signals in a separate folder
plt.title(title)
if xlabel is not None: # if xlabel and ylabel are not specified there will be default values
# if xlabel and ylabel are not specified there will be default values
if xlabel is not None:
plt.xlabel(xlabel)
else:
plt.xlabel('Samples')
Expand All @@ -49,11 +51,51 @@ def plot_to_file(title, data, dir_name, filename, xlabel=None, ylabel=None, data
else:
plt.ylabel('Voltage [V]')
plt.grid(visible=True)
plt.plot(data) # if a second set of data must be printed (for ch0 and ch1 phase difference in this case)
# if x_data is not None, the plot will be displayed with the specified x_data
if x_data is not None:
plt.plot(x_data, data)
else:
plt.plot(data)
# if a second set of data must be printed (for ch0 and ch1 phase difference in this case)
if data1 is not None:
plt.plot(data1)
if x_data is not None:
plt.plot(x_data, data1)
else:
plt.plot(data1)
# Optional configurations
if x_lim is not None:
plt.xlim(*x_lim)
if y_lim is not None:
plt.ylim(*y_lim)
if data_marked is not None:
plt.plot(data_marked, data[data_marked], 'xr')
plt.savefig(dir_name + "/" + filename)
plt.close()
return
return


def get_time_format(samples, sample_rate):
x_time = np.linspace(0, samples/sample_rate, samples)

if x_time[-1] < 1e-6:
x_time *= 1e9
x_label = "Time [ns]"
elif x_time[-1] < 1e-3:
x_time *= 1e6
x_label = "Time [us]"
elif x_time[-1] < 1:
x_time *= 1e3
x_label = "Time [ms]"
else:
x_label = "Time [s]"
return x_time, x_label


def get_sample_rate_display_format(sample_rate):
if sample_rate < 1e3:
return f"{sample_rate:.2f} Hz"
if sample_rate < 1e6:
return f"{sample_rate/1e3:.2f} KHz"
if sample_rate < 1e9:
return f"{sample_rate/1e6:.2f} MHz"
return f"{sample_rate/1e9:.2f} GHz"
31 changes: 29 additions & 2 deletions tests/m2k_analog_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import libm2k

from shapefile import shape_gen, ref_shape_gen, shape_name
from analog_functions import test_amplitude, test_shape, phase_diff_ch0_ch1, test_offset, test_analog_trigger, \
from analog_functions import get_experiment_config_for_sample_hold, test_amplitude, test_last_sample_hold, test_shape, phase_diff_ch0_ch1, test_offset, test_analog_trigger, \
test_voltmeter_functionality, test_kernel_buffers, test_buffer_transition_glitch
from analog_functions import noncyclic_buffer_test, set_samplerates_for_shapetest, set_trig_for_cyclicbuffer_test, \
test_calibration
Expand All @@ -15,6 +15,8 @@
import logger
from repeat_test import repeat

from helpers import get_sample_rate_display_format


class A_AnalogTests(unittest.TestCase):
# Class Where are defined all test methods for AnalogIn, AnalogOut, AnalogTrigger
Expand Down Expand Up @@ -251,4 +253,29 @@ def test_buffer_transition_glitch(self):
num_glitches = test_buffer_transition_glitch(channel, ain, aout, trig, waveform)

with self.subTest(msg='Test buffer transition glitch: ' + waveform + ' on ch' + str(channel)):
self.assertEqual(num_glitches, 0, 'Found ' + str(num_glitches) + ' glitches on channel ' + str(channel))
self.assertEqual(num_glitches, 0, 'Found ' + str(num_glitches) + ' glitches on channel ' + str(channel))

# @unittest.skipIf(ctx.getFirmwareVersion() < 'v0.33', 'The sample and hold feature is available starting with firmware v0.33. Note: v0.32 had a glitch that is handled in this test.')
def test_last_sample_hold(self):
"""
Tests the last sample hold functionality for different channels and DAC sample rates.
This test iterates over different channels (each channel individually and both channels together)
and then tests the last sample hold functionality. When testing both channels together, 'None'
is used to denote this case.
It verifies that the last sample is held correctly and that there are no glitches in the output signal in between the last sample and a new push.
"""
for channel in [libm2k.ANALOG_IN_CHANNEL_1, libm2k.ANALOG_IN_CHANNEL_2, None]:
for dac_sr in [75_000_000, 7_500_000, 750_000, 75_000, 7_500]:
cfg = get_experiment_config_for_sample_hold(dac_sr)
sr_format = get_sample_rate_display_format(cfg.get("dac_sr"))
chn_str = "both_channels" if channel is None else f"CH{channel}"

reset.analog_in(ain)
reset.analog_out(aout)
reset.trigger(trig)
has_glitch, is_last_sample_hold_ok = test_last_sample_hold(channel, ain, aout, trig, ctx, cfg)
with self.subTest(msg='Test last sample hold on ' + str(chn_str) + ' with DAC SR ' + str(cfg.get("dac_sr"))):
self.assertEqual(has_glitch, False, f'Found glitches on {chn_str} with DAC SR {sr_format}')
self.assertEqual(is_last_sample_hold_ok, True, f'Last sample hold failed on {chn_str} with DAC SR {sr_format}')
3 changes: 2 additions & 1 deletion tests/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ def wait_():
"test_shapes_ch0\n"
"test_shapes_ch1\n"
"test_voltmeter\n"
"test_buffer_transition_glitch\n")
"test_buffer_transition_glitch\n"
"test_last_sample_hold\n")
print("\n ===== class B_TriggerTests ===== \n")
print(" ===== tests ====== \n")
print("test_1_trigger_object\n"
Expand Down
Loading

0 comments on commit 8b42898

Please sign in to comment.