diff --git a/tests/analog_functions.py b/tests/analog_functions.py index 4e0ee4a1..eb018888 100644 --- a/tests/analog_functions.py +++ b/tests/analog_functions.py @@ -1053,7 +1053,7 @@ def test_oversampling_ratio(channel, ain, aout, trig): return test_osr -def plot_to_file(title, data, dir_name, filename, xlabel=None, ylabel=None, data1=None): +def plot_to_file(title, data, dir_name, filename, xlabel=None, ylabel=None, data1=None, data_marked=None): # Saves the plots in a separate folder # Arguments: # title -- Title of the plot @@ -1063,6 +1063,7 @@ def plot_to_file(title, data, dir_name, filename, xlabel=None, ylabel=None, data # xlabel -- Label of x-Axis (default: {None}) # ylabel -- Label of y-Axis(default: {None}) # data1 -- Data that should be plotted on the same plot(default: {None}) + # data_marked -- Data that represents specific points on the plot(default: {None}) # plot the signals in a separate folder plt.title(title) if xlabel is not None: # if xlabel and ylabel are not specified there will be default values @@ -1077,6 +1078,8 @@ def plot_to_file(title, data, dir_name, filename, xlabel=None, ylabel=None, data plt.plot(data) # if a second set of data must be printed (for ch0 and ch1 phase difference in this case) if data1 is not None: plt.plot(data1) + if data_marked is not None: + plt.plot(data_marked, data[data_marked], 'xr') plt.savefig(dir_name + "/" + filename) plt.close() return @@ -1195,6 +1198,8 @@ def write_file(file, test_name, channel, data_string): file.write("\n\nTest of analog input and output with different signal shapes:\n") elif test_name == "amplitude": file.write("\n\nAmplitude test on channel " + str(channel) + ": \n") + elif test_name == "buffer_transition_glitch": + file.write("\n\nTest buffer transition glitch on channel " + str(channel) + ": \n") for i in range(len(data_string)): file.write(str(data_string[i]) + '\n') @@ -1282,3 +1287,125 @@ def test_kernel_buffers(ain, trig, nb_kernel_buffers): except: error = True return error + +def compute_percentage_diff(v1, v2): + # https://www.oracle.com/webfolder/technetwork/data-quality/edqhelp/content/processor_library/matching/comparisons/percent_difference.htm + vals = sorted([v1, v2]) + percentage_diff = 0 + try: + percentage_diff = ((vals[1] - vals[0]) / vals[1]) + except: + pass + return percentage_diff + +def compute_y_distance(v1, v2): + lens = sorted([v1, v2]) + distance = lens[1] - lens[0] + return distance + +def is_spike(data, peak, threshold = 0.25): + # for sampling_frequency_in = 1_000_000 the center of of the glitch is at 75 samples distance with repect to the peak + dx_small = 75 + dx_large = 200 + + prev_sample, next_sample = data[peak - dx_small], data[peak + dx_small] + step_inside_glitch_range = compute_y_distance(prev_sample, next_sample) + prev_sample, next_sample = data[peak - dx_large], data[peak + dx_large] + step_outside_glitch_range = compute_y_distance(prev_sample, next_sample) + + percentage_dif = compute_percentage_diff(step_inside_glitch_range, step_outside_glitch_range) + return percentage_dif > threshold + +def test_buffer_transition_glitch(channel, ain, aout, trig, waveform, amplitude=1): + if gen_reports: + from create_files import results_file, results_dir, csv, open_files_and_dirs + if results_file is None: + file, dir_name, csv_path = open_files_and_dirs() + else: + file = results_file + dir_name = results_dir + csv_path = csv + else: + file = [] + + BUFFER_SIZE = 5_00_000 + + reset.analog_in(ain) + reset.analog_out(aout) + reset.trigger(trig) + + test_name = "buffer_transition_glitch" + data_string = [] + + dac_sr = 75_000 + adc_sr = 1_000_000 + + ain.setSampleRate(adc_sr) + ain.setRange(channel, libm2k.PLUS_MINUS_2_5V) + + set_trig(trig, channel, 0, libm2k.RISING_EDGE_ANALOG, 0.1) + + aout.setSampleRate(channel, dac_sr) + aout.enableChannel(channel, True) + aout.setCyclic(True) + ctx.setTimeout(10000) + + out_samples = 4096 + if waveform == 'sine': + offset = 0 + data_high = amplitude* np.sin(np.linspace(offset, 2*np.pi + offset, out_samples )) + data_low = -amplitude* np.sin(np.linspace(offset, 2*np.pi + offset, out_samples)) + if waveform == 'dc': + data_high = [amplitude] * out_samples + data_low = [-amplitude] * out_samples + + ain.startAcquisition(BUFFER_SIZE) + for _ in range(5): + aout.push(channel, data_high) + time.sleep(0.1) + aout.push(channel, data_low) + time.sleep(0.1) + try: + data = np.array(ain.getSamples(BUFFER_SIZE)[channel][int(BUFFER_SIZE* 0.05):]) + except: + print('Timeout occured') + + aout.stop() + ain.stopAcquisition() + + param_args = { + 'sine': { + 'threshold': 0.1, + 'find_peaks_args': { + 'prominence': (0.25, 1), + 'height': (0.1, amplitude), + }, + }, + 'dc': { + 'threshold': 0.4, + 'find_peaks_args': { + 'prominence': 1, + 'height': 0.1, + }, + }, + } + + peaks_pos, _ = find_peaks(data, **param_args[waveform]["find_peaks_args"]) + peaks_neg, _ = find_peaks(-data, **param_args[waveform]["find_peaks_args"]) + + peaks = np.concatenate((peaks_pos, peaks_neg)) + filtered_peaks = list(filter(lambda peak: is_spike(data, peak, param_args[waveform]["threshold"]), peaks)) + num_peaks = len(filtered_peaks) + + data_string.append( + "Number of glitch peaks found in " + waveform + " signal :" + str(num_peaks)) + + if gen_reports: + write_file(file, test_name, channel, data_string) + plot_to_file(f'Buffer Glitch , channel{channel}', + data, + dir_name, + f'buffer_glitch_plot_ch{channel}_{waveform}.png', + data_marked=filtered_peaks) + + return num_peaks \ No newline at end of file diff --git a/tests/m2k_analog_test.py b/tests/m2k_analog_test.py index f692849f..79bb8203 100644 --- a/tests/m2k_analog_test.py +++ b/tests/m2k_analog_test.py @@ -4,7 +4,7 @@ from shapefile import shape_gen, ref_shape_gen, shape_name from analog_functions import test_amplitude, test_shape, phase_diff_ch0_ch1, test_offset, test_analog_trigger, \ - test_voltmeter_functionality, test_kernel_buffers + test_voltmeter_functionality, test_kernel_buffers, test_buffer_transition_glitch from analog_functions import noncyclic_buffer_test, set_samplerates_for_shapetest, set_trig_for_cyclicbuffer_test, \ test_calibration from analog_functions import compare_in_out_frequency, test_oversampling_ratio, channels_diff_in_samples, test_timeout, \ @@ -240,3 +240,15 @@ def test_timeout(self): else: with self.subTest(msg='No timeout'): self.assertEqual(data, True, 'Data was not acquired correctly') + + @unittest.skipIf(ctx.getFirmwareVersion() < 'v0.32', 'This is a known bug in previous firmware implementations which is fixed in v0.32') + def test_buffer_transition_glitch(self): + # Pushing a new cyclic buffer should output the value of the raw attr. In previous firmware versions, the new buffer would start + # with the last sample from previous buffer which lead to a glitch in the output signal. This test verifies that the glitch is not present anymore. + + for channel in [libm2k.ANALOG_IN_CHANNEL_1, libm2k.ANALOG_IN_CHANNEL_2]: + for waveform in ['dc', 'sine']: + num_glitches = test_buffer_transition_glitch(channel, ain, aout, trig, waveform) + + with self.subTest(msg='Test buffer transition glitch: ' + waveform + ' on ch' + str(channel)): + self.assertEqual(num_glitches, 0, 'Found ' + str(num_glitches) + ' glitches on channel ' + str(channel)) \ No newline at end of file diff --git a/tests/main.py b/tests/main.py index e4508553..43de5e7e 100644 --- a/tests/main.py +++ b/tests/main.py @@ -73,7 +73,8 @@ def wait_(): "test_phase_difference_between_channels_in_samples\n" "test_shapes_ch0\n" "test_shapes_ch1\n" - "test_voltmeter\n") + "test_voltmeter\n" + "test_buffer_transition_glitch\n") print("\n ===== class B_TriggerTests ===== \n") print(" ===== tests ====== \n") print("test_1_trigger_object\n"