Skip to content

Commit

Permalink
now testing per profile and not per sensor
Browse files Browse the repository at this point in the history
  • Loading branch information
AviaAv committed Dec 7, 2023
1 parent de8c872 commit ca25baf
Showing 1 changed file with 43 additions and 49 deletions.
92 changes: 43 additions & 49 deletions unit-tests/live/frames/test-fps-permutations.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
# run perform_fps_test(sensor_profiles_array, [mode])
ALL_PERMUTATIONS = 1
ALL_PAIRS = 2
ALL_SENSORS = 3
ALL_STREAMS = 3

# global variable used to count on all the sensors simultaneously
count_frames = False
Expand All @@ -46,18 +46,12 @@ def check_fps_pair(measured_fps, expected_fps):

def get_expected_fps(sensor_profiles_dict):
"""
For every sensor, find the expected fps according to its profiles
Return a dictionary between the sensor and its expected fps
Returns a dictionary between each profile and its expected fps
"""
expected_fps_dict = {}
# log.d(sensor_profiles_dict)
for key in sensor_profiles_dict:
avg = 0
for profile in sensor_profiles_dict[key]:
avg += profile.fps()

avg /= len(sensor_profiles_dict[key])
expected_fps_dict[key] = avg
for sensor, profiles in sensor_profiles_dict.items():
for profile in profiles:
expected_fps_dict[profile.stream_name()] = profile.fps()
return expected_fps_dict


Expand All @@ -69,7 +63,7 @@ def should_test(mode, permutation):
If the mode is ALL_PAIRS return true only if there are exactly two streams to be tested
"""
return ((mode == ALL_PERMUTATIONS) or
(mode == ALL_SENSORS and all(v == 1 for v in permutation)) or
(mode == ALL_STREAMS and all(v == 1 for v in permutation)) or
(mode == ALL_PAIRS and permutation.count(1) == 2))


Expand Down Expand Up @@ -108,9 +102,9 @@ def get_time_est_string(num_profiles, modes):
# test_time = math.comb(num_profiles, 2) * time_per_test
test_time = choose(num_profiles, 2) * time_per_test
details_str += f"{choose(num_profiles,2)} tests for all pairs"
elif mode == ALL_SENSORS:
elif mode == ALL_STREAMS:
test_time = time_per_test
details_str += f"1 test for all sensors on"
details_str += f"1 test for all streams on"

details_str += " + "
total_time += test_time
Expand Down Expand Up @@ -139,55 +133,56 @@ def get_tested_profiles_string(sensor_profiles_dict):
#############################################
def check_fps_dict(measured_fps, expected_fps):
all_fps_ok = True
for key in expected_fps:
res = check_fps_pair(measured_fps[key], expected_fps[key])
for profile_name in expected_fps:
res = check_fps_pair(measured_fps[profile_name], expected_fps[profile_name])
if not res:
all_fps_ok = False
log.d(f"Expected {expected_fps[key]} fps, received {measured_fps[key]} fps in sensor {key.name}"
log.d(f"Expected {expected_fps[profile_name]} fps, received {measured_fps[profile_name]} fps in profile"
f" {profile_name}"
f" { '(Pass)' if res else '(Fail)' }")
return all_fps_ok


def generate_functions(sensor_profiles_dict):
def generate_functions(sensor_profiles_dict, profile_name_fps_dict, profile_name_lock_dict):
"""
Creates callable functions for each sensor to be triggered when a new frame arrives
Used to count frames received for measuring fps
"""
locks = {}
sensor_function_dict = {}
for sensor_key in sensor_profiles_dict:
new_lock = threading.Lock()
locks[sensor_key] = new_lock

def on_frame_received(frame, key=sensor_key):
def on_frame_received(frame): # variables declared on generate_functions should not be used here
global count_frames
if count_frames:
with locks[key]:
sensor_profiles_dict[key] += 1
profile_name = frame.profile.stream_name()
with profile_name_lock_dict[profile_name]: # lock and count frame
profile_name_fps_dict[profile_name] += 1

sensor_function_dict[sensor_key] = on_frame_received
return sensor_function_dict


def measure_fps(sensor_profiles_dict):
"""
Given a dictionary of sensors and profiles to test, activate all sensors on the given profiles
Given a dictionary of sensors and profiles to test, activate all streams on the given profiles
and measure fps
Return a dictionary of sensors and the fps measured for them
Return a dictionary of profiles and the fps measured for them
"""
global TIME_FOR_STEADY_STATE
global TIME_TO_COUNT_FRAMES

global count_frames
count_frames = False

# initialize fps dict
sensor_fps_dict = {}
for key in sensor_profiles_dict:
sensor_fps_dict[key] = 0
# initialize fps and locks dict
profile_name_fps_dict = {}
profile_name_lock_dict = {}
for sensor, profiles in sensor_profiles_dict.items():
for profile in profiles:
profile_name_fps_dict[profile.stream_name()] = 0
profile_name_lock_dict[profile.stream_name()] = threading.Lock()

# generate sensor-callable dictionary
funcs_dict = generate_functions(sensor_fps_dict)
funcs_dict = generate_functions(sensor_profiles_dict, profile_name_fps_dict, profile_name_lock_dict)

for sensor, profiles in sensor_profiles_dict.items():
sensor.open(profiles)
Expand All @@ -200,27 +195,23 @@ def measure_fps(sensor_profiles_dict):
count_frames = False # Stop counting

for sensor, profiles in sensor_profiles_dict.items():
sensor_fps_dict[sensor] /= len(profiles) # number of profiles on the sensor
sensor_fps_dict[sensor] /= TIME_TO_COUNT_FRAMES
# now for each sensor we have the average fps received
for profile in profiles:
profile_name_fps_dict[profile.stream_name()] /= TIME_TO_COUNT_FRAMES

sensor.stop()
sensor.close()

return sensor_fps_dict
return profile_name_fps_dict


def get_test_details_str(sensor_profile_dict, expected_fps_dict):
def get_test_details_str(sensor_profile_dict):
s = ""
for sensor_key in sensor_profile_dict:
if len(sensor_profile_dict[sensor_key]) > 1:
s += f"Expected average fps for {sensor_key.name} is {expected_fps_dict[sensor_key]}:\n"
for profile in sensor_profile_dict[sensor_key]:
s += f"Profile {profile.stream_name()} expects {profile.fps()} fps on {get_resolution(profile)}\n"
else:
s += (f"Expected fps for sensor {sensor_key.name} on profile "
f"{sensor_profile_dict[sensor_key][0].stream_name()} is {expected_fps_dict[sensor_key]}"
f" on {get_resolution(sensor_profile_dict[sensor_key][0])}\n")
for sensor, profiles in sensor_profile_dict.items():
for profile in profiles:
s += (f"Expected fps for profile {profile.stream_name()} on sensor "
f"{sensor.name} is {profile.fps()} "
f"on {get_resolution(profile)}\n")

s = s.replace("on (0, 0)", "") # remove no resolution for Motion Module profiles
return s

Expand All @@ -243,7 +234,7 @@ def perform_fps_test(sensor_profiles_arr, modes):
partial_dict = get_dict_for_permutation(sensor_profiles_arr, perm)
test.start("Testing", get_tested_profiles_string(partial_dict))
expected_fps = get_expected_fps(partial_dict)
log.d(get_test_details_str(partial_dict, expected_fps))
log.d(get_test_details_str(partial_dict))
fps_dict = measure_fps(partial_dict)
test.check(check_fps_dict(fps_dict, expected_fps))
test.finish()
Expand All @@ -261,6 +252,9 @@ def get_profiles_by_resolution(sensor, resolution, fps=None):
if fps is None or p.fps() == fps:
# to avoid having a long run time, we don't choose the same stream more than once
if p.stream_type() not in stream_types_added:
if p.stream_type() == rs.stream.infrared:
if p.stream_index() != 1:
continue # on some devices, using Infrared 1 seems to have better fps than IR/IR2
profiles.append(p)
stream_types_added.append(p.stream_type())
return profiles
Expand Down Expand Up @@ -288,7 +282,7 @@ def get_mutual_resolution(sensor):
if len(profiles) == len(stream_resolutions_dict):
return profiles

# if none found, try to find a resolution that all profiles have, on any fps (fps will be taken as an average later)
# if none found, try to find a resolution that all profiles have, on any fps
for option in possible_combinations:
profiles = get_profiles_by_resolution(sensor, option[0], None)
if len(profiles) == len(stream_resolutions_dict):
Expand Down Expand Up @@ -327,7 +321,7 @@ def get_sensors_and_profiles(device):
if TEST_ALL_COMBINATIONS:
test_modes = [ALL_PERMUTATIONS]
else:
test_modes = [ALL_PAIRS, ALL_SENSORS]
test_modes = [ALL_PAIRS, ALL_STREAMS]

perform_fps_test(sensor_profiles_array, test_modes)

Expand Down

0 comments on commit ca25baf

Please sign in to comment.