Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests/validations fw 0.32 #353

Merged
merged 2 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 128 additions & 1 deletion tests/analog_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,7 @@ def test_oversampling_ratio(channel, ain, aout, trig):
return test_osr


def plot_to_file(title, data, dir_name, filename, xlabel=None, ylabel=None, data1=None):
def plot_to_file(title, data, dir_name, filename, xlabel=None, ylabel=None, data1=None, data_marked=None):
# Saves the plots in a separate folder
# Arguments:
# title -- Title of the plot
Expand All @@ -1063,6 +1063,7 @@ def plot_to_file(title, data, dir_name, filename, xlabel=None, ylabel=None, data
# xlabel -- Label of x-Axis (default: {None})
# ylabel -- Label of y-Axis(default: {None})
# data1 -- Data that should be plotted on the same plot(default: {None})
# data_marked -- Data that represents specific points on the plot(default: {None})
# plot the signals in a separate folder
plt.title(title)
if xlabel is not None: # if xlabel and ylabel are not specified there will be default values
Expand All @@ -1077,6 +1078,8 @@ def plot_to_file(title, data, dir_name, filename, xlabel=None, ylabel=None, data
plt.plot(data) # if a second set of data must be printed (for ch0 and ch1 phase difference in this case)
if data1 is not None:
plt.plot(data1)
if data_marked is not None:
plt.plot(data_marked, data[data_marked], 'xr')
plt.savefig(dir_name + "/" + filename)
plt.close()
return
Expand Down Expand Up @@ -1195,6 +1198,8 @@ def write_file(file, test_name, channel, data_string):
file.write("\n\nTest of analog input and output with different signal shapes:\n")
elif test_name == "amplitude":
file.write("\n\nAmplitude test on channel " + str(channel) + ": \n")
elif test_name == "buffer_transition_glitch":
file.write("\n\nTest buffer transition glitch on channel " + str(channel) + ": \n")
for i in range(len(data_string)):
file.write(str(data_string[i]) + '\n')

Expand Down Expand Up @@ -1282,3 +1287,125 @@ def test_kernel_buffers(ain, trig, nb_kernel_buffers):
except:
error = True
return error

def compute_percentage_diff(v1, v2):
# https://www.oracle.com/webfolder/technetwork/data-quality/edqhelp/content/processor_library/matching/comparisons/percent_difference.htm
vals = sorted([v1, v2])
percentage_diff = 0
try:
percentage_diff = ((vals[1] - vals[0]) / vals[1])
except:
pass
return percentage_diff

def compute_y_distance(v1, v2):
lens = sorted([v1, v2])
distance = lens[1] - lens[0]
return distance

def is_spike(data, peak, threshold = 0.25):
# for sampling_frequency_in = 1_000_000 the center of of the glitch is at 75 samples distance with repect to the peak
dx_small = 75
dx_large = 200

prev_sample, next_sample = data[peak - dx_small], data[peak + dx_small]
step_inside_glitch_range = compute_y_distance(prev_sample, next_sample)
prev_sample, next_sample = data[peak - dx_large], data[peak + dx_large]
step_outside_glitch_range = compute_y_distance(prev_sample, next_sample)

percentage_dif = compute_percentage_diff(step_inside_glitch_range, step_outside_glitch_range)
return percentage_dif > threshold

def test_buffer_transition_glitch(channel, ain, aout, trig, waveform, amplitude=1):
if gen_reports:
from create_files import results_file, results_dir, csv, open_files_and_dirs
if results_file is None:
file, dir_name, csv_path = open_files_and_dirs()
else:
file = results_file
dir_name = results_dir
csv_path = csv
else:
file = []

BUFFER_SIZE = 5_00_000

reset.analog_in(ain)
reset.analog_out(aout)
reset.trigger(trig)

test_name = "buffer_transition_glitch"
data_string = []

dac_sr = 75_000
adc_sr = 1_000_000

ain.setSampleRate(adc_sr)
ain.setRange(channel, libm2k.PLUS_MINUS_2_5V)

set_trig(trig, channel, 0, libm2k.RISING_EDGE_ANALOG, 0.1)

aout.setSampleRate(channel, dac_sr)
aout.enableChannel(channel, True)
aout.setCyclic(True)
ctx.setTimeout(10000)

out_samples = 4096
if waveform == 'sine':
offset = 0
data_high = amplitude* np.sin(np.linspace(offset, 2*np.pi + offset, out_samples ))
data_low = -amplitude* np.sin(np.linspace(offset, 2*np.pi + offset, out_samples))
if waveform == 'dc':
data_high = [amplitude] * out_samples
data_low = [-amplitude] * out_samples

ain.startAcquisition(BUFFER_SIZE)
for _ in range(5):
aout.push(channel, data_high)
time.sleep(0.1)
aout.push(channel, data_low)
time.sleep(0.1)
try:
data = np.array(ain.getSamples(BUFFER_SIZE)[channel][int(BUFFER_SIZE* 0.05):])
except:
print('Timeout occured')

aout.stop()
ain.stopAcquisition()

param_args = {
'sine': {
'threshold': 0.1,
'find_peaks_args': {
'prominence': (0.25, 1),
'height': (0.1, amplitude),
},
},
'dc': {
'threshold': 0.4,
'find_peaks_args': {
'prominence': 1,
'height': 0.1,
},
},
}

peaks_pos, _ = find_peaks(data, **param_args[waveform]["find_peaks_args"])
peaks_neg, _ = find_peaks(-data, **param_args[waveform]["find_peaks_args"])

peaks = np.concatenate((peaks_pos, peaks_neg))
filtered_peaks = list(filter(lambda peak: is_spike(data, peak, param_args[waveform]["threshold"]), peaks))
num_peaks = len(filtered_peaks)

data_string.append(
"Number of glitch peaks found in " + waveform + " signal :" + str(num_peaks))

if gen_reports:
write_file(file, test_name, channel, data_string)
plot_to_file(f'Buffer Glitch , channel{channel}',
data,
dir_name,
f'buffer_glitch_plot_ch{channel}_{waveform}.png',
data_marked=filtered_peaks)

return num_peaks
89 changes: 89 additions & 0 deletions tests/digital_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,3 +333,92 @@ def test_kernel_buffers(dig, nb_kernel_buffers):
except:
error = True
return error


def write_file(file, test_name, channel, data_string):
if test_name == "pattern_generator_pulse":
file.write("\n\nTest pattern generator on DIO_" + str(channel) + '\n')

for i in range(len(data_string)):
file.write(str(data_string[i]) + '\n')


def count_edges(data, threshold = 0.5):
logic_data = np.where(data > threshold, 1, 0) # replace with logic values
rising_edges = np.sum(np.diff(logic_data) > 0)
falling_edges = np.sum(np.diff(-logic_data + 1) > 0) # inverted signal -> falling edges
return rising_edges + falling_edges


def test_pattern_generator_pulse(dig, d_trig, channel):
if gen_reports:
from create_files import results_file, results_dir, csv, open_files_and_dirs
if results_file is None:
file, dir_name, csv_path = open_files_and_dirs()
else:
file = results_file
dir_name = results_dir
csv_path = csv
else:
file = []

timeout = 15_000 # in milliseconds
delay = 8192
buffer_size = 100_000
sampling_frequency_in = 10_000_000
sampling_frequency_out = 10_000

test_name = "pattern_generator_pulse"
data_string = []

file_name, dir_name, csv_path = result_files(gen_reports)
dig.reset()
ctx.setTimeout(timeout)

dig.setSampleRateIn(sampling_frequency_in)
dig.setSampleRateOut(sampling_frequency_out)
dig.setCyclic(False)

d_trig.reset()
d_trig.setDigitalMode(libm2k.DIO_OR)
d_trig.setDigitalStreamingFlag(False)
# only tested channel should trigger acquisition
for i in range(16):
d_trig.setDigitalCondition(i, libm2k.NO_TRIGGER_DIGITAL)
d_trig.setDigitalCondition(channel, libm2k.RISING_EDGE_DIGITAL)
d_trig.setDigitalDelay(-delay)

dig.startAcquisition(buffer_size)

dig.setDirection(channel, libm2k.DIO_OUTPUT)
dig.setValueRaw(channel, libm2k.LOW) # setting chn to raw 0 before enable does not fix the bug
dig.enableChannel(channel,True)

# expected: line start LOW and then stays HIGH
# each 0xFFFF should create 1 edge in the current channel
# 0, 0, 0, 0xFFFF, 0xFFFF, 0 , 0, 0xFFFF 0xFFFF, 0 , 0, 0xFFFF
# 1 2 3 4 5
buff = np.tile(A = np.array([0xFFFF, 0 , 0, 0xFFFF]), reps = 2)
buff= np.insert(buff, 0, [0, 0, 0, 0xFFFF])

expected_num_edges = (buff == 0xFFFF).sum()
buff = buff.tolist()
dig.push(buff)

data = dig.getSamples(buffer_size)
crnt_chn_dio_data = np.array(list(map(lambda s: (((0x0001 << channel) & int(s)) >> channel), data)))
actual_num_edges = count_edges(crnt_chn_dio_data)
extra_edges = abs(expected_num_edges - actual_num_edges)

data_string.append(
"Expected: " + str(expected_num_edges) + " , found: " + str(actual_num_edges))

if gen_reports:
write_file(file, test_name, channel, data_string)
plot_to_file("Pattern generator on ch" + str(channel), crnt_chn_dio_data, dir_name,
"digital_pattern_generator_ch" + str(channel) + ".png")

dig.stopAcquisition()
dig.stopBufferOut()

return extra_edges
14 changes: 13 additions & 1 deletion tests/m2k_analog_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from shapefile import shape_gen, ref_shape_gen, shape_name
from analog_functions import test_amplitude, test_shape, phase_diff_ch0_ch1, test_offset, test_analog_trigger, \
test_voltmeter_functionality, test_kernel_buffers
test_voltmeter_functionality, test_kernel_buffers, test_buffer_transition_glitch
from analog_functions import noncyclic_buffer_test, set_samplerates_for_shapetest, set_trig_for_cyclicbuffer_test, \
test_calibration
from analog_functions import compare_in_out_frequency, test_oversampling_ratio, channels_diff_in_samples, test_timeout, \
Expand Down Expand Up @@ -240,3 +240,15 @@ def test_timeout(self):
else:
with self.subTest(msg='No timeout'):
self.assertEqual(data, True, 'Data was not acquired correctly')

@unittest.skipIf(ctx.getFirmwareVersion() < 'v0.32', 'This is a known bug in previous firmware implementations which is fixed in v0.32')
def test_buffer_transition_glitch(self):
# Pushing a new cyclic buffer should output the value of the raw attr. In previous firmware versions, the new buffer would start
# with the last sample from previous buffer which lead to a glitch in the output signal. This test verifies that the glitch is not present anymore.

for channel in [libm2k.ANALOG_IN_CHANNEL_1, libm2k.ANALOG_IN_CHANNEL_2]:
for waveform in ['dc', 'sine']:
num_glitches = test_buffer_transition_glitch(channel, ain, aout, trig, waveform)

with self.subTest(msg='Test buffer transition glitch: ' + waveform + ' on ch' + str(channel)):
self.assertEqual(num_glitches, 0, 'Found ' + str(num_glitches) + ' glitches on channel ' + str(channel))
14 changes: 12 additions & 2 deletions tests/m2k_digital_test.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import unittest
import libm2k
from digital_functions import dig_reset, set_digital_trigger, check_digital_channels_state, check_digital_output, \
check_digital_trigger, check_open_drain_mode, test_kernel_buffers
check_digital_trigger, check_open_drain_mode, test_kernel_buffers, test_pattern_generator_pulse
from digital_functions import test_digital_cyclic_buffer
from open_context import dig, d_trig
from open_context import ctx, dig, d_trig
import logger
from repeat_test import repeat

Expand Down Expand Up @@ -50,3 +50,13 @@ def test_kernel_buffers(self):
with self.subTest(
msg='Set kernel buffers count on Digital In without raising an error '):
self.assertEqual(test_err, False, 'Error occured')

@unittest.skip("This fix is a known bug which was not fixed in firmware v0.32")
def test_pattern_generator_pulse(self):
# Verifies that the pattern generator does not generate any additional edges. Currently it generates 1 additional edge
# before outputting the pattern set. At the end it holds the value of the last sample at the ouput.
# The measured pattern should be the same as the one set.
for i in range(16):
test_result = test_pattern_generator_pulse(dig, d_trig, i)
with self.subTest(i):
self.assertEqual(test_result, 0, "Found " + str(test_result) + " aditional edges on Channel: " + str(i))
4 changes: 3 additions & 1 deletion tests/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ def wait_():
"test_phase_difference_between_channels_in_samples\n"
"test_shapes_ch0\n"
"test_shapes_ch1\n"
"test_voltmeter\n")
"test_voltmeter\n"
"test_buffer_transition_glitch\n")
print("\n ===== class B_TriggerTests ===== \n")
print(" ===== tests ====== \n")
print("test_1_trigger_object\n"
Expand All @@ -92,6 +93,7 @@ def wait_():
print("test_trig_conditions\n")
print("test_cyclic_buffer\n")
print("test_kernel_buffers\n")
print("test_pattern_generator_pulse\n")

exit()
elif len(sys.argv) > 1 and "nofiles" in sys.argv:
Expand Down
Loading