From ca25baffcb11a7daa498dbfb83aa73425630620a Mon Sep 17 00:00:00 2001 From: Avia Avraham <145359432+AviaAv@users.noreply.github.com> Date: Sun, 12 Nov 2023 15:30:24 +0200 Subject: [PATCH] now testing per profile and not per sensor --- .../live/frames/test-fps-permutations.py | 92 +++++++++---------- 1 file changed, 43 insertions(+), 49 deletions(-) diff --git a/unit-tests/live/frames/test-fps-permutations.py b/unit-tests/live/frames/test-fps-permutations.py index a40b44621f6..daba960a28f 100644 --- a/unit-tests/live/frames/test-fps-permutations.py +++ b/unit-tests/live/frames/test-fps-permutations.py @@ -21,7 +21,7 @@ # run perform_fps_test(sensor_profiles_array, [mode]) ALL_PERMUTATIONS = 1 ALL_PAIRS = 2 -ALL_SENSORS = 3 +ALL_STREAMS = 3 # global variable used to count on all the sensors simultaneously count_frames = False @@ -46,18 +46,12 @@ def check_fps_pair(measured_fps, expected_fps): def get_expected_fps(sensor_profiles_dict): """ - For every sensor, find the expected fps according to its profiles - Return a dictionary between the sensor and its expected fps + Returns a dictionary between each profile and its expected fps """ expected_fps_dict = {} - # log.d(sensor_profiles_dict) - for key in sensor_profiles_dict: - avg = 0 - for profile in sensor_profiles_dict[key]: - avg += profile.fps() - - avg /= len(sensor_profiles_dict[key]) - expected_fps_dict[key] = avg + for sensor, profiles in sensor_profiles_dict.items(): + for profile in profiles: + expected_fps_dict[profile.stream_name()] = profile.fps() return expected_fps_dict @@ -69,7 +63,7 @@ def should_test(mode, permutation): If the mode is ALL_PAIRS return true only if there are exactly two streams to be tested """ return ((mode == ALL_PERMUTATIONS) or - (mode == ALL_SENSORS and all(v == 1 for v in permutation)) or + (mode == ALL_STREAMS and all(v == 1 for v in permutation)) or (mode == ALL_PAIRS and permutation.count(1) == 2)) @@ -108,9 +102,9 @@ def get_time_est_string(num_profiles, modes): # test_time = math.comb(num_profiles, 2) * time_per_test test_time = choose(num_profiles, 2) * time_per_test details_str += f"{choose(num_profiles,2)} tests for all pairs" - elif mode == ALL_SENSORS: + elif mode == ALL_STREAMS: test_time = time_per_test - details_str += f"1 test for all sensors on" + details_str += f"1 test for all streams on" details_str += " + " total_time += test_time @@ -139,31 +133,29 @@ def get_tested_profiles_string(sensor_profiles_dict): ############################################# def check_fps_dict(measured_fps, expected_fps): all_fps_ok = True - for key in expected_fps: - res = check_fps_pair(measured_fps[key], expected_fps[key]) + for profile_name in expected_fps: + res = check_fps_pair(measured_fps[profile_name], expected_fps[profile_name]) if not res: all_fps_ok = False - log.d(f"Expected {expected_fps[key]} fps, received {measured_fps[key]} fps in sensor {key.name}" + log.d(f"Expected {expected_fps[profile_name]} fps, received {measured_fps[profile_name]} fps in profile" + f" {profile_name}" f" { '(Pass)' if res else '(Fail)' }") return all_fps_ok -def generate_functions(sensor_profiles_dict): +def generate_functions(sensor_profiles_dict, profile_name_fps_dict, profile_name_lock_dict): """ Creates callable functions for each sensor to be triggered when a new frame arrives Used to count frames received for measuring fps """ - locks = {} sensor_function_dict = {} for sensor_key in sensor_profiles_dict: - new_lock = threading.Lock() - locks[sensor_key] = new_lock - - def on_frame_received(frame, key=sensor_key): + def on_frame_received(frame): # variables declared on generate_functions should not be used here global count_frames if count_frames: - with locks[key]: - sensor_profiles_dict[key] += 1 + profile_name = frame.profile.stream_name() + with profile_name_lock_dict[profile_name]: # lock and count frame + profile_name_fps_dict[profile_name] += 1 sensor_function_dict[sensor_key] = on_frame_received return sensor_function_dict @@ -171,9 +163,9 @@ def on_frame_received(frame, key=sensor_key): def measure_fps(sensor_profiles_dict): """ - Given a dictionary of sensors and profiles to test, activate all sensors on the given profiles + Given a dictionary of sensors and profiles to test, activate all streams on the given profiles and measure fps - Return a dictionary of sensors and the fps measured for them + Return a dictionary of profiles and the fps measured for them """ global TIME_FOR_STEADY_STATE global TIME_TO_COUNT_FRAMES @@ -181,13 +173,16 @@ def measure_fps(sensor_profiles_dict): global count_frames count_frames = False - # initialize fps dict - sensor_fps_dict = {} - for key in sensor_profiles_dict: - sensor_fps_dict[key] = 0 + # initialize fps and locks dict + profile_name_fps_dict = {} + profile_name_lock_dict = {} + for sensor, profiles in sensor_profiles_dict.items(): + for profile in profiles: + profile_name_fps_dict[profile.stream_name()] = 0 + profile_name_lock_dict[profile.stream_name()] = threading.Lock() # generate sensor-callable dictionary - funcs_dict = generate_functions(sensor_fps_dict) + funcs_dict = generate_functions(sensor_profiles_dict, profile_name_fps_dict, profile_name_lock_dict) for sensor, profiles in sensor_profiles_dict.items(): sensor.open(profiles) @@ -200,27 +195,23 @@ def measure_fps(sensor_profiles_dict): count_frames = False # Stop counting for sensor, profiles in sensor_profiles_dict.items(): - sensor_fps_dict[sensor] /= len(profiles) # number of profiles on the sensor - sensor_fps_dict[sensor] /= TIME_TO_COUNT_FRAMES - # now for each sensor we have the average fps received + for profile in profiles: + profile_name_fps_dict[profile.stream_name()] /= TIME_TO_COUNT_FRAMES sensor.stop() sensor.close() - return sensor_fps_dict + return profile_name_fps_dict -def get_test_details_str(sensor_profile_dict, expected_fps_dict): +def get_test_details_str(sensor_profile_dict): s = "" - for sensor_key in sensor_profile_dict: - if len(sensor_profile_dict[sensor_key]) > 1: - s += f"Expected average fps for {sensor_key.name} is {expected_fps_dict[sensor_key]}:\n" - for profile in sensor_profile_dict[sensor_key]: - s += f"Profile {profile.stream_name()} expects {profile.fps()} fps on {get_resolution(profile)}\n" - else: - s += (f"Expected fps for sensor {sensor_key.name} on profile " - f"{sensor_profile_dict[sensor_key][0].stream_name()} is {expected_fps_dict[sensor_key]}" - f" on {get_resolution(sensor_profile_dict[sensor_key][0])}\n") + for sensor, profiles in sensor_profile_dict.items(): + for profile in profiles: + s += (f"Expected fps for profile {profile.stream_name()} on sensor " + f"{sensor.name} is {profile.fps()} " + f"on {get_resolution(profile)}\n") + s = s.replace("on (0, 0)", "") # remove no resolution for Motion Module profiles return s @@ -243,7 +234,7 @@ def perform_fps_test(sensor_profiles_arr, modes): partial_dict = get_dict_for_permutation(sensor_profiles_arr, perm) test.start("Testing", get_tested_profiles_string(partial_dict)) expected_fps = get_expected_fps(partial_dict) - log.d(get_test_details_str(partial_dict, expected_fps)) + log.d(get_test_details_str(partial_dict)) fps_dict = measure_fps(partial_dict) test.check(check_fps_dict(fps_dict, expected_fps)) test.finish() @@ -261,6 +252,9 @@ def get_profiles_by_resolution(sensor, resolution, fps=None): if fps is None or p.fps() == fps: # to avoid having a long run time, we don't choose the same stream more than once if p.stream_type() not in stream_types_added: + if p.stream_type() == rs.stream.infrared: + if p.stream_index() != 1: + continue # on some devices, using Infrared 1 seems to have better fps than IR/IR2 profiles.append(p) stream_types_added.append(p.stream_type()) return profiles @@ -288,7 +282,7 @@ def get_mutual_resolution(sensor): if len(profiles) == len(stream_resolutions_dict): return profiles - # if none found, try to find a resolution that all profiles have, on any fps (fps will be taken as an average later) + # if none found, try to find a resolution that all profiles have, on any fps for option in possible_combinations: profiles = get_profiles_by_resolution(sensor, option[0], None) if len(profiles) == len(stream_resolutions_dict): @@ -327,7 +321,7 @@ def get_sensors_and_profiles(device): if TEST_ALL_COMBINATIONS: test_modes = [ALL_PERMUTATIONS] else: - test_modes = [ALL_PAIRS, ALL_SENSORS] + test_modes = [ALL_PAIRS, ALL_STREAMS] perform_fps_test(sensor_profiles_array, test_modes)