diff --git a/Tools/autotest/vehicle_test_suite.py b/Tools/autotest/vehicle_test_suite.py index 07a0e13b43654..b788843a13a8b 100644 --- a/Tools/autotest/vehicle_test_suite.py +++ b/Tools/autotest/vehicle_test_suite.py @@ -465,6 +465,47 @@ def progress_text(self, current_value): return (f"Want=({self.target.lat},{self.target.lng}) distance={self.horizontal_error(current_value)}") +class WaitAndMaintainEKFFlags(WaitAndMaintain): + '''Waits for EKF status flags to include required_flags and have + error_bits *not* set.''' + def __init__(self, test_suite, required_flags, error_bits, **kwargs): + super(WaitAndMaintainEKFFlags, self).__init__(test_suite, **kwargs) + self.required_flags = required_flags + self.error_bits = error_bits + self.last_EKF_STATUS_REPORT = None + + def announce_start_text(self): + return f"Waiting for EKF value {self.required_flags}" + + def get_current_value(self): + self.last_EKF_STATUS_REPORT = self.test_suite.assert_receive_message('EKF_STATUS_REPORT', timeout=10) + return self.last_EKF_STATUS_REPORT.flags + + def validate_value(self, value): + if value & self.error_bits: + return False + + if (value & self.required_flags) != self.required_flags: + return False + + return True + + def success_text(self): + return "EKF Flags OK" + + def timeoutexception(self): + self.progress("Last EKF status report:") + self.progress(self.test_suite.dump_message_verbose(self.last_EKF_STATUS_REPORT)) + + return AutoTestTimeoutException(f"Failed to get EKF.flags={self.required_flags}") + + def progress_text(self, current_value): + error_bits_str = "" + if current_value & self.error_bits: + error_bits_str = " (error bits present)" + return (f"Want=({self.required_flags}) got={current_value}{error_bits_str}") + + class WaitAndMaintainArmed(WaitAndMaintain): def get_current_value(self): return self.test_suite.armed() @@ -8017,33 +8058,8 @@ def wait_ekf_happy(self, timeout=45, require_absolute=True): error_bits |= mavutil.mavlink.ESTIMATOR_GPS_GLITCH self.wait_ekf_flags(required_value, error_bits, timeout=timeout) - def wait_ekf_flags(self, required_value, error_bits, timeout=30): - self.progress("Waiting for EKF value %u" % required_value) - last_print_time = 0 - tstart = self.get_sim_time() - m = None - while timeout is None or self.get_sim_time_cached() < tstart + timeout: - m = self.mav.recv_match(type='EKF_STATUS_REPORT', blocking=True, timeout=timeout) - if m is None: - continue - current = m.flags - errors = current & error_bits - everything_ok = (errors == 0 and - current & required_value == required_value) - if everything_ok or self.get_sim_time_cached() - last_print_time > 1: - self.progress("Wait EKF.flags: required:%u current:%u errors=%u" % - (required_value, current, errors)) - last_print_time = self.get_sim_time_cached() - if everything_ok: - self.progress("EKF Flags OK") - return True - m_str = str(m) - if m is not None: - m_str = self.dump_message_verbose(m) - self.progress("Last EKF_STATUS_REPORT message:") - self.progress(m_str) - raise AutoTestTimeoutException("Failed to get EKF.flags=%u" % - required_value) + def wait_ekf_flags(self, required_value, error_bits, **kwargs): + WaitAndMaintainEKFFlags(self, required_value, error_bits, **kwargs).run() def wait_gps_disable(self, position_horizontal=True, position_vertical=False, timeout=30): """Disable GPS and wait for EKF to report the end of assistance from GPS."""