Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests/digital: coverage of features introduced in fw0.33 #360

Merged
merged 2 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 237 additions & 31 deletions tests/digital_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,34 @@ def plot_to_file(title, data, dir_name, filename, xlabel=None, ylabel=None, data
plt.close()
return

def plot_to_file_all_channels(title, data, dir_name, filename, xlabel=None, ylabel=None):
# Saves the plots in a separate folder
# Arguments:
# title -- Title of the plot\n
# data -- Data to be plotted\n
# filename -- Name of the file with the plot\n
# Keyword Arguments:
# xlabel -- Label of x-Axis (default: {None})
# ylabel -- Label of y-Axis(default: {None})

# plot the signals in a separate folder
plt.title(title)
if xlabel is not None: # if xlabel and ylabel are not specified there will be default values
plt.xlabel(xlabel)
else:
plt.xlabel('Samples')
if ylabel is not None:
plt.ylabel(ylabel)
else:
plt.ylabel('Voltage [V]')
plt.grid(visible=True)
for chn in range(16):
DIO_chn = np.array(list(map(lambda s: (((0x0001 << chn) & int(s)) >> chn), data)))
plt.plot(DIO_chn+chn) # offset the channels
plt.yticks(range(17))
plt.savefig(dir_name + "/" + filename)
plt.close()
return

def save_data_to_csv(csv_vals, csv_file):
df = DataFrame(csv_vals)
Expand Down Expand Up @@ -344,13 +372,19 @@ def write_file(file, test_name, channel, data_string):


def count_edges(data, threshold = 0.5):
logic_data = np.where(data > threshold, 1, 0) # replace with logic values
rising_edges = np.sum(np.diff(logic_data) > 0)
falling_edges = np.sum(np.diff(-logic_data + 1) > 0) # inverted signal -> falling edges
# Count number of edges for each digital channel
rising_edges = 0
falling_edges = 0
for channel in range(0, 16):
crnt_chn_dio = np.array(list(map(lambda s: (((0x0001 << channel) & int(s)) >> channel), data)))
rising_edges += np.sum(np.diff(crnt_chn_dio) > 0)
falling_edges += np.sum(np.diff(-crnt_chn_dio + 1) > 0) # inverted signal -> falling edges

return rising_edges + falling_edges


def test_pattern_generator_pulse(dig, d_trig, channel):
# channel == -1: means all channels
if gen_reports:
from create_files import results_file, results_dir, csv, open_files_and_dirs
if results_file is None:
Expand All @@ -362,13 +396,13 @@ def test_pattern_generator_pulse(dig, d_trig, channel):
else:
file = []

timeout = 15_000 # in milliseconds
timeout = 15_000 # [ms]
delay = 8192
buffer_size = 100_000
sampling_frequency_in = 10_000_000
sampling_frequency_out = 10_000

test_name = "pattern_generator_pulse"
test_name = "pattern_generator_glitch"
data_string = []

file_name, dir_name, csv_path = result_files(gen_reports)
Expand All @@ -382,43 +416,215 @@ def test_pattern_generator_pulse(dig, d_trig, channel):
d_trig.reset()
d_trig.setDigitalMode(libm2k.DIO_OR)
d_trig.setDigitalStreamingFlag(False)
# only tested channel should trigger acquisition

for i in range(16):
d_trig.setDigitalCondition(i, libm2k.NO_TRIGGER_DIGITAL)
d_trig.setDigitalCondition(channel, libm2k.RISING_EDGE_DIGITAL)
d_trig.setDigitalDelay(-delay)

dig.startAcquisition(buffer_size)
# Configure trigger
if channel == -1:
for i in range(16):
d_trig.setDigitalCondition(i, libm2k.RISING_EDGE_DIGITAL)
dig.setDirection(i, libm2k.DIO_OUTPUT)
dig.setValueRaw(i, libm2k.LOW)
dig.enableChannel(i,True)
else:
d_trig.setDigitalCondition(channel, libm2k.RISING_EDGE_DIGITAL)
dig.setDirection(channel, libm2k.DIO_OUTPUT)
dig.setValueRaw(channel, libm2k.LOW)
dig.enableChannel(channel,True)

dig.setDirection(channel, libm2k.DIO_OUTPUT)
dig.setValueRaw(channel, libm2k.LOW) # setting chn to raw 0 before enable does not fix the bug
dig.enableChannel(channel,True)
d_trig.setDigitalDelay(-delay)
dig.startAcquisition(buffer_size)

# expected: line start LOW and then stays HIGH
# each 0xFFFF should create 1 edge in the current channel
# 0, 0, 0, 0xFFFF, 0xFFFF, 0 , 0, 0xFFFF 0xFFFF, 0 , 0, 0xFFFF
# 1 2 3 4 5
buff = np.tile(A = np.array([0xFFFF, 0 , 0, 0xFFFF]), reps = 2)
buff= np.insert(buff, 0, [0, 0, 0, 0xFFFF])
# Generate pattern
# each sample==1 should create 1 edge in the current channel
# 0, 0, 0, sample, sample, 0 , 0, sample sample, 0 , 0, sample, sample, 0, 0
# 1 2 3 4 5 6
sample = 1 << channel if channel != -1 else 0xFFFF
TX_data = np.tile(A = np.array([sample, 0 , 0, sample]), reps = 2)
TX_data= np.insert(TX_data, 0, [0, 0, 0, sample])
TX_data = np.append(TX_data, [sample, 0, 0, 0])

expected_num_edges = (buff == 0xFFFF).sum()
buff = buff.tolist()
dig.push(buff)
expected_num_edges = count_edges(TX_data)
TX_data = TX_data.tolist()
dig.push(TX_data)

data = dig.getSamples(buffer_size)
crnt_chn_dio_data = np.array(list(map(lambda s: (((0x0001 << channel) & int(s)) >> channel), data)))
actual_num_edges = count_edges(crnt_chn_dio_data)
extra_edges = abs(expected_num_edges - actual_num_edges)
RX_data = dig.getSamples(buffer_size)
actual_num_edges = count_edges(np.array(RX_data))

data_string.append(
"Expected: " + str(expected_num_edges) + " , found: " + str(actual_num_edges))
extra_edges = abs(expected_num_edges - actual_num_edges)
data_string.append(f"\tExpected: {expected_num_edges}, found {actual_num_edges}")

if gen_reports:
write_file(file, test_name, channel, data_string)
plot_to_file("Pattern generator on ch" + str(channel), crnt_chn_dio_data, dir_name,
"digital_pattern_generator_ch" + str(channel) + ".png")

channel_name = f'DIO{channel}'if channel != -1 else 'all channels'
plot_to_file_all_channels(title=f"Pattern generator on {channel_name}",
data=RX_data, dir_name=dir_name,
filename=f"pattern_generator_glitch_{channel_name}.png",
xlabel='Samples', ylabel='DIO channel')
dig.stopAcquisition()
dig.stopBufferOut()

return extra_edges
return extra_edges


def generate_digital_clock(
n_samples: int,
duty: float,
channel = None
):
"""
Generates a digital clock signal with a specified number of samples, duty cycle, and optional channel.

Args:
n_samples (int): The total number of samples in the signal. Must be greater than or equal to 16 and a multiple of 4.
duty (float): The duty cycle of the clock signal as a fraction (0 <= duty <= 1). This defines the proportion of the signal that will be high.
channel (Optional[int]): The specific digital channel for which to generate the signal. If None, all channels will be set.
Returns:
List[int]: A list of integers representing the digital clock signal, where each element corresponds to a sample.
"""
assert n_samples >= 16, "Number of samples must be greater than 16"
assert n_samples % 4 == 0, "Number of samples must be a multiple of 4"
assert 0 <= duty <= 1, "Duty cycle must be between 0 and 1"

signal : np.ndarray = np.arange(n_samples) > (n_samples * duty) # should be 0s then 1s
if channel is not None:
signal = signal << channel
else:
signal = signal * 0xFFFF
return signal.tolist()


def verify_samples(
samples: np.ndarray,
expected_value: int,
position: str = 'end',
sample_range=5000
):
"""
Verifies that samples hold the expected value at the specified position.

Args:
samples (np.ndarray): Samples to verify.
expected_value (int): Expected value to hold.
position (str): Position to check ('start' or 'end').
sample_window (int): Number of samples to consider for verification.

Returns:
bool: True if verification passes, False otherwise.
"""
if position == 'start':
sample_segment = samples[:sample_range]
elif position == 'end':
sample_segment = samples[-sample_range:]
else:
raise ValueError("Invalid position. Must be 'start' or 'end'.")
return np.all(sample_segment == expected_value)


def get_DIO_chn_samples(samples, channel):
"""
Isolates the samples for a specific digital channel.
"""
channel_mask = 1 << channel
extracted_samples = [(samples & channel_mask) >> channel for samples in samples]
return extracted_samples


def test_last_sample_hold(dig: libm2k.M2kDigital, trig: libm2k.M2kHardwareTrigger, ctx:libm2k.M2k, channel=None):
if gen_reports:
from create_files import results_file, results_dir, csv, open_files_and_dirs
if results_file is None:
file, dir_name, csv_path = open_files_and_dirs()
else:
file = results_file
dir_name = results_dir
csv_path = csv
else:
file = []

test_name = "pattern_generator_last_sample_hold"
data_string = []

HIGH = 1
HIGH_ALL = 0xFFFF
HOLD_VALUE = HIGH if channel is not None else HIGH_ALL

delay = 8192
timeout = 15_000 # [ms]
buffer_size = 30_000
sampling_frequency_in = 100_000_000
sampling_frequency_out = 10_000_000
cyclic = False

buff = generate_digital_clock(n_samples=1024, duty=0.5, channel=channel)

ctx.setTimeout(timeout)
dig.stopAcquisition()
dig.stopBufferOut()
dig.reset()

dig.setSampleRateIn(sampling_frequency_in)
dig.setSampleRateOut(sampling_frequency_out)
assert dig.getSampleRateIn() == sampling_frequency_in, "Failed to set sample rate IN"
assert dig.getSampleRateOut() == sampling_frequency_out, "Failed to set sample rate OUT"

dig.setCyclic(False)
assert dig.getCyclic() == cyclic, "Failed to set cyclic mode"

# Digital trigger rests
trig.reset()
trig.setDigitalMode(libm2k.DIO_OR)
trig.setDigitalStreamingFlag(False)
for i in range(16):
trig.setDigitalCondition(i, libm2k.NO_TRIGGER_DIGITAL)

# Config trigger
if channel is not None:
trig.setDigitalCondition(channel, libm2k.RISING_EDGE_DIGITAL)
dig.setDirection(channel, libm2k.DIO_OUTPUT)
dig.enableChannel(channel, True)
else:
for i in range(16):
trig.setDigitalCondition(i, libm2k.RISING_EDGE_DIGITAL)
dig.setDirection(i, libm2k.DIO_OUTPUT)
dig.enableChannel(i, True)
trig.setDigitalDelay(-delay)

chn_str = str(channel) if channel is not None else "ALL"
# Step 1
dig.startAcquisition(buffer_size)
dig.push(buff)
RX_data = dig.getSamples(buffer_size)
samples = np.array(get_DIO_chn_samples(RX_data, channel) if channel is not None else RX_data)
result_step1 = verify_samples(samples, HOLD_VALUE, position='end', sample_range=50)
if gen_reports:
plot_to_file_all_channels(
title=f"Last sample hold on DIO_{chn_str}",
data=RX_data, dir_name=dir_name,
filename=f"last_sample_hold_DIO_{chn_str}_step{1}.png",
xlabel='Samples', ylabel='DIO channel'
)
time.sleep(0.15)

# Step 2
if channel is not None:
trig.setDigitalCondition(channel, libm2k.FALLING_EDGE_DIGITAL)
else:
for i in range(16):
trig.setDigitalCondition(i, libm2k.FALLING_EDGE_DIGITAL)
dig.push(buff)
RX_data = dig.getSamples(buffer_size)
samples = np.array(get_DIO_chn_samples(RX_data, channel) if channel is not None else RX_data)
result_step2 = (
verify_samples(samples, HOLD_VALUE, position='start', sample_range=50) and
verify_samples(samples, HOLD_VALUE, position='end', sample_range=50)
)
if gen_reports:
plot_to_file_all_channels(
title=f"Last sample hold on DIO_{chn_str}",
data=RX_data, dir_name=dir_name,
filename=f"last_sample_hold_DIO_{chn_str}_step{2}.png",
xlabel='Samples', ylabel='DIO channel'
)
return result_step1 and result_step2
31 changes: 26 additions & 5 deletions tests/m2k_digital_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import unittest
import libm2k
from digital_functions import dig_reset, set_digital_trigger, check_digital_channels_state, check_digital_output, \
check_digital_trigger, check_open_drain_mode, test_kernel_buffers, test_pattern_generator_pulse
check_digital_trigger, check_open_drain_mode, test_kernel_buffers, test_last_sample_hold, test_pattern_generator_pulse
from digital_functions import test_digital_cyclic_buffer
from open_context import ctx, dig, d_trig
import logger
Expand Down Expand Up @@ -51,12 +51,33 @@ def test_kernel_buffers(self):
msg='Set kernel buffers count on Digital In without raising an error '):
self.assertEqual(test_err, False, 'Error occured')

@unittest.skip("This fix is a known bug which was not fixed in firmware v0.32")
@unittest.skipIf(ctx.getFirmwareVersion() < 'v0.33', 'Test applicable for firmware v0.33 and later.')
def test_pattern_generator_pulse(self):
# Verifies that the pattern generator does not generate any additional edges. Currently it generates 1 additional edge
# before outputting the pattern set. At the end it holds the value of the last sample at the ouput.
# Verifies that the pattern generator does not generate any additional edges prior to the pattern set.
# At the end it holds the value of the last sample at the output.
# The measured pattern should be the same as the one set.

# Single channel
for i in range(16):
test_result = test_pattern_generator_pulse(dig, d_trig, i)
with self.subTest(i):
self.assertEqual(test_result, 0, "Found " + str(test_result) + " aditional edges on Channel: " + str(i))
self.assertEqual(test_result, 0, "Found " + str(test_result) + " aditional edges on Channel: " + str(i))
# All channels
test_result = test_pattern_generator_pulse(dig, d_trig, -1)
with self.subTest(-1):
self.assertEqual(test_result, 0, "Found " + str(test_result) + " aditional edges in multi-channel test")

@unittest.skipIf(ctx.getFirmwareVersion() < 'v0.33', 'Test applicable for firmware v0.33 and later.')
def test_last_sample_hold(self):
# Tests the last sample and hold functionality of the digital interface.
# - After the pattern is sent, the last sample should be held at the output.

# Single channel
for DIO_chn in range(16):
result_ok = test_last_sample_hold(dig, d_trig, ctx, DIO_chn)
with self.subTest(msg=f"DIO{str(DIO_chn)}"):
self.assertEqual(result_ok, True, f"Failed to hold the last sample on DIO{str(DIO_chn)}")
# All channels
result_ok = test_last_sample_hold(dig, d_trig, ctx, None)
with self.subTest(msg="DIO all"):
self.assertEqual(result_ok, True, f"Failed to hold the last sample on all DIO test")
3 changes: 2 additions & 1 deletion tests/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ def wait_():
print("test_trig_conditions\n")
print("test_cyclic_buffer\n")
print("test_kernel_buffers\n")
print("test_pattern_generator_pulse\n")
print("test_pattern_generator_glitch\n")
print("test_last_sample_hold\n")

exit()
elif len(sys.argv) > 1 and "nofiles" in sys.argv:
Expand Down
Loading