diff --git a/client/drivers/main.py b/client/drivers/main.py index 96a0b71..43112fe 100644 --- a/client/drivers/main.py +++ b/client/drivers/main.py @@ -9,6 +9,7 @@ ## SECTION: Imports + from PiicoDev_Unified import sleep_ms from PiicoDev_Switch import * from PiicoDev_SSD1306 import * @@ -24,7 +25,6 @@ from data.routines import * - ## SECTION: Global constants CUSHION_GPIO_PIN = 8 @@ -45,33 +45,35 @@ LOGOUT_SUCCESS_DELAY = 3000 """ Number of milliseconds between the user successfully logging out and returning to main(). """ -GET_POSTURE_DATA_TIMEOUT = timedelta(milliseconds = 2000) # DEBUG: Change this value up to ~60000 later. +GET_POSTURE_DATA_TIMEOUT = timedelta( + milliseconds=2000 +) # DEBUG: Change this value up to ~60000 later. """ Minimum delay between reading posture data from the SQLite database, in do_everything(). """ PROPORTION_IN_FRAME_THRESHOLD = 0.3 """ Proportion of the time the user must be in frame for any feedback to be given. FIXME: Fine-tune this value later. """ -HANDLE_CUSHION_FEEDBACK_TIMEOUT = timedelta(milliseconds = 5000) +HANDLE_CUSHION_FEEDBACK_TIMEOUT = timedelta(milliseconds=5000) """ Minimum delay between consecutive uses of the vibration motor. Used in handle_feedback(). """ -CUSHION_ACTIVE_INTERVAL = timedelta(milliseconds = 1000) +CUSHION_ACTIVE_INTERVAL = timedelta(milliseconds=1000) """ Length of time for which the vibration motor should vibrate. Used in handle_cushion_feedback(). """ CUSHION_PROPORTION_GOOD_THRESHOLD = 0.5 """ Threshold for vibration cushion feedback. If the proportion of "good" sitting posture is below this, the cushion will vibrate. """ -HANDLE_PLANT_FEEDBACK_TIMEOUT = timedelta(milliseconds = 10000) +HANDLE_PLANT_FEEDBACK_TIMEOUT = timedelta(milliseconds=10000) """ Minimum delay between consecutive uses of the plant-controlling servos. Used in handle_feedback(). """ -HANDLE_SNIFF_FEEDBACK_TIMEOUT = timedelta(milliseconds = 20000) +HANDLE_SNIFF_FEEDBACK_TIMEOUT = timedelta(milliseconds=20000) """ Minimum delay between consecutive uses of the scent bottle-controlling servos. Used in handle_feedback(). """ DEBUG_DO_EVERYTHING_INTERVAL = 1000 """ DEBUG Number of milliseconds between each loop iteration in do_everything(). """ - ## SECTION: main() + def main(): """ Entry point for the control program. @@ -80,7 +82,7 @@ def main(): print(" main()") # :DEBUG - global hardware + global hardware hardware = initialise_hardware() init_database() @@ -95,9 +97,9 @@ def main(): do_everything(main_data) - ## SECTION: Hardware initialisation + # 2024-09-01_15-29 Gabe: TESTED. for buttons and OLED display. def initialise_hardware() -> HardwareComponents: """ @@ -112,10 +114,10 @@ def initialise_hardware() -> HardwareComponents: TODO: Complete the function with all of the hardware peripherals (incrementally, as they get integrated). """ - print(" initialise_hardware()") # DEBUG + print(" initialise_hardware()") # DEBUG return_me = HardwareComponents.make_fresh() # Clear button queues - return_me.button0.was_pressed + return_me.button0.was_pressed return_me.button1.was_pressed # Set up GPIO pins GPIO.setmode(GPIO.BCM) # Same pin numbering convention as the PiicoDev header @@ -123,20 +125,20 @@ def initialise_hardware() -> HardwareComponents: # Write low to stop buzzer from mistakenly buzzing, if necessary GPIO.output(CUSHION_GPIO_PIN, GPIO.LOW) - print(" initialise_hardware() FINISHED") # DEBUG + print(" initialise_hardware() FINISHED") # DEBUG return return_me - ## SECTION: Login handling + # 2024-09-01_15-52 Gabe: TESTED. def wait_for_login_attempt() -> bool: """ Waits until the user attempts to log in. Returns: - (bool): True when the user attempts to log in. + (bool): True when the user attempts to log in. """ print(" BEGIN wait_for_login_attempt()") @@ -153,10 +155,11 @@ def wait_for_login_attempt() -> bool: # Clear the display hardware.display.fill(0) hardware.display.show() - print(" END wait_for_login_attempt()") # DEBUG + print(" END wait_for_login_attempt()") # DEBUG return True sleep_ms(WAIT_FOR_LOGIN_POLLING_INTERVAL) + # 2024-09-01 17:06 Gabe: TESTED., assuming ai_bros_face_recogniser() does what it should do. def attempt_login() -> ControlledData: """ @@ -185,32 +188,37 @@ def attempt_login() -> ControlledData: hardware.display.show() picture = take_picture() if picture.failed: - print(' Picture Failed') # DEBUG + print(" Picture Failed") # DEBUG hardware.display.fill(0) hardware.oled_display_text(PICTURE_FAILED_MESSAGE, 0, 0, 1) hardware.display.show() sleep_ms(LOGIN_TAKE_PICTURE_INTERVAL) continue - face = ai_bros_face_recogniser(picture.underlying_picture) # TODO: This should be an external API call. + face = ai_bros_face_recogniser( + picture.underlying_picture + ) # TODO: This should be an external API call. if face.failed: - print(" AI has failed us") # DEBUG + print(" AI has failed us") # DEBUG hardware.display.fill(0) hardware.oled_display_text(AI_FAILED_MESSAGE, 0, 0, 1) hardware.display.show() sleep_ms(LOGIN_TAKE_PICTURE_INTERVAL) continue if face.matched: - print(" Mega W for AI") # DEBUG + print(" Mega W for AI") # DEBUG return ControlledData.make_empty(face.user_id) elif ask_create_new_user(): - return ControlledData.make_empty(create_new_user(picture.underlying_picture)) + return ControlledData.make_empty( + create_new_user(picture.underlying_picture) + ) # Tell the user the login failed - print(" attempt_login(): Totally failed lol") # DEBUG + print(" attempt_login(): Totally failed lol") # DEBUG hardware.display.fill(0) hardware.oled_display_text(LOGIN_TOTALLY_FAILED_MESSAGE, 0, 0, 1) hardware.display.show() sleep_ms(FAIL_LOGIN_DELAY) - return ControlledData.make_failed() + return ControlledData.make_failed() + def take_picture() -> Picture: """ @@ -224,6 +232,7 @@ def take_picture() -> Picture: # :DEBUG return DEBUG_return_value + # 2024-09-01 17:06 Gabe: TESTED. def ask_create_new_user() -> bool: """ @@ -237,7 +246,12 @@ def ask_create_new_user() -> bool: """ print(" BEGIN ask_create_new_user()") - CREATE_NEW_USER_MESSAGES = ["No face matched.", "Create new user?", "button0: no", "button1: yes"] + CREATE_NEW_USER_MESSAGES = [ + "No face matched.", + "Create new user?", + "button0: no", + "button1: yes", + ] # Display to screen hardware.display.fill(0) hardware.oled_display_texts(CREATE_NEW_USER_MESSAGES, 0, 0, 1) @@ -251,17 +265,18 @@ def ask_create_new_user() -> bool: # Clear the display hardware.display.fill(0) hardware.display.show() - print(" END ask_create_new_user(): do NOT create new user") # DEBUG + print(" END ask_create_new_user(): do NOT create new user") # DEBUG return False if hardware.button1.was_pressed: # Clear the display hardware.display.fill(0) hardware.display.show() - print(" END ask_create_new_user(): DO create new user") # DEBUG + print(" END ask_create_new_user(): DO create new user") # DEBUG return True sleep_ms(ASK_CREATE_NEW_USER_POLLING_INTERVAL) -def create_new_user(underlying_picture : int) -> int: + +def create_new_user(underlying_picture: int) -> int: """ Create a new user based on the given picture, and return their user id. @@ -284,13 +299,13 @@ def create_new_user(underlying_picture : int) -> int: # return new_user_id # DEBUG - ## SECTION: Control for the logged-in user + # 2024-09-02 07-03 Gabe: Currently working the following here: # top-level control flow # interaction with buttons and display -def do_everything(auspost : ControlledData) -> None: +def do_everything(auspost: ControlledData) -> None: """ Main control flow once a user is logged in. @@ -298,7 +313,7 @@ def do_everything(auspost : ControlledData) -> None: (auspost : ControlledData): Data encapsulating the current state of the program. Requires: ! auspost.is_failed() - + TODO: Actually implement this """ print(" BEGIN do_everything()") @@ -314,7 +329,7 @@ def do_everything(auspost : ControlledData) -> None: hardware.oled_display_text(LOGIN_MESSAGE, 0, 0, 1) hardware.display.show() sleep_ms(LOGIN_SUCCESS_DELAY) - + # Clear button queues hardware.button0.was_pressed hardware.button1.was_pressed @@ -323,7 +338,7 @@ def do_everything(auspost : ControlledData) -> None: hardware.display.show() while True: - # Loop invariant: ! auspost.is_failed() + # Loop invariant: ! auspost.is_failed() # Check for user logout if hardware.button0.was_pressed: hardware.display.fill(0) @@ -331,8 +346,8 @@ def do_everything(auspost : ControlledData) -> None: hardware.display.show() sleep_ms(LOGOUT_SUCCESS_DELAY) print(" END do_everything()") - return - + return + # Probably should run individual threads for each of these # TODO: Move the threading to a more reasonable location. main() is probably best. # posture_monitoring_thread = threading.Thread(handle_posture_monitoring, args=(auspost)) @@ -344,15 +359,16 @@ def do_everything(auspost : ControlledData) -> None: sleep_ms(DEBUG_DO_EVERYTHING_INTERVAL) + # 2024-09-13 11-32 Gabe: TESTED. -def update_display_screen(auspost : ControlledData) -> bool: +def update_display_screen(auspost: ControlledData) -> bool: """ Update the display screen with whatever needs to be on there. We will display: As per HardwareComponents.get_control_messages(), and Current-session posture graph - Args: + Args: (auspost : ControlledData): Data encapsulating the current state of the program. Returns: @@ -365,16 +381,21 @@ def update_display_screen(auspost : ControlledData) -> bool: print(" BEGIN update_display_screen()") hardware.display.fill(0) - hardware.oled_display_texts(hardware.get_control_messages(auspost.get_user_id()), 0, 0, 1) + hardware.oled_display_texts( + hardware.get_control_messages(auspost.get_user_id()), 0, 0, 1 + ) if not auspost.get_posture_data().empty(): - hardware.display.updateGraph2D(hardware.posture_graph, auspost.get_posture_data().get_nowait()) + hardware.display.updateGraph2D( + hardware.posture_graph, auspost.get_posture_data().get_nowait() + ) # Render hardware.display.show() print(" END update_display_screen()") return True -def handle_posture_monitoring(auspost : ControlledData) -> bool: + +def handle_posture_monitoring(auspost: ControlledData) -> bool: """ Take a snapshot monitoring the user, and update the given ControlledData if necessary. @@ -386,7 +407,7 @@ def handle_posture_monitoring(auspost : ControlledData) -> bool: ! auspost.is_failed() Ensures: ! auspost.is_failed() - + TODO: Implement error handling WARNING: UNTESTED! """ @@ -394,20 +415,23 @@ def handle_posture_monitoring(auspost : ControlledData) -> bool: print(" handle_posture_monitoring()") # :DEBUG now = datetime.now() - if (now > auspost.get_last_snapshot_time() + GET_POSTURE_DATA_TIMEOUT): + if now > auspost.get_last_snapshot_time() + GET_POSTURE_DATA_TIMEOUT: # TODO: The ai_bros_get_posture_data() call might fail once it's implemented properly. # If it does, we need to handle it properly. - auspost.accept_new_posture_data(ai_bros_get_posture_data(auspost.get_last_snapshot_time())) + auspost.accept_new_posture_data( + ai_bros_get_posture_data(auspost.get_last_snapshot_time()) + ) # DEBUG: auspost.accept_new_posture_data([auspost.DEBUG_get_next_posture_graph_value()]) # :DEBUG auspost.set_last_snapshot_time(now) return True -def handle_feedback(auspost : ControlledData) -> bool: + +def handle_feedback(auspost: ControlledData) -> bool: """ Provide feedback to the user if necessary. - + Args: (auspost : ControlledData): Data encapsulating the current state of the program. Returns: @@ -417,27 +441,30 @@ def handle_feedback(auspost : ControlledData) -> bool: Ensures: ! auspost.is_failed() """ - if (datetime.now() > auspost.get_last_cushion_time() + HANDLE_CUSHION_FEEDBACK_TIMEOUT): + if ( + datetime.now() + > auspost.get_last_cushion_time() + HANDLE_CUSHION_FEEDBACK_TIMEOUT + ): if not handle_cushion_feedback(auspost): return False - if (datetime.now() > auspost.get_last_plant_time() + HANDLE_PLANT_FEEDBACK_TIMEOUT): + if datetime.now() > auspost.get_last_plant_time() + HANDLE_PLANT_FEEDBACK_TIMEOUT: if not handle_plant_feedback(auspost): return False - if (datetime.now() > auspost.get_last_sniff_time() + HANDLE_SNIFF_FEEDBACK_TIMEOUT): + if datetime.now() > auspost.get_last_sniff_time() + HANDLE_SNIFF_FEEDBACK_TIMEOUT: if not handle_sniff_feedback(auspost): return False - - return True + return True ## SECTION: Feedback handling + # 2024-09-15_20-18 Gabe: TESTED. -def handle_cushion_feedback(auspost : ControlledData) -> bool: +def handle_cushion_feedback(auspost: ControlledData) -> bool: """ Vibrate cushion (if necessary), and update the timestamp of when cushion feedback was last given. - + Args: (auspost : ControlledData): Data encapsulating the current state of the program. Returns: @@ -446,7 +473,7 @@ def handle_cushion_feedback(auspost : ControlledData) -> bool: ! auspost.is_failed() Ensures: ! auspost.is_failed() - + TODO: Implement this method. Currently prints a debug statement and updates the time. """ # DEBUG: @@ -456,12 +483,12 @@ def handle_cushion_feedback(auspost : ControlledData) -> bool: # Load posture records within the last HANDLE_CUSHION_FEEDBACK_TIMEOUT now = datetime.now() recent_posture_data = get_user_postures( - auspost.get_user_id(), - num = -1, - period_start = now - HANDLE_CUSHION_FEEDBACK_TIMEOUT, - period_end = now + auspost.get_user_id(), + num=-1, + period_start=now - HANDLE_CUSHION_FEEDBACK_TIMEOUT, + period_end=now, ) - + # Conditions for exiting early # 2024-09-15_19-47 Gabe: TESTED. if len(recent_posture_data) == 0: @@ -469,13 +496,19 @@ def handle_cushion_feedback(auspost : ControlledData) -> bool: auspost.set_last_cushion_time(datetime.now()) return True # 2024-09-15_20-18 Gabe: TESTED. - average_prop_in_frame = sum([posture.prop_in_frame for posture in recent_posture_data]) / len(recent_posture_data) + average_prop_in_frame = sum( + [posture.prop_in_frame for posture in recent_posture_data] + ) / len(recent_posture_data) if average_prop_in_frame < PROPORTION_IN_FRAME_THRESHOLD: - print(" Exiting handle_cushion_feedback() early: Not in frame for a high enough proportion of time.") + print( + " Exiting handle_cushion_feedback() early: Not in frame for a high enough proportion of time." + ) auspost.set_last_cushion_time(datetime.now()) return True # 2024-09-15_20-18 Gabe: TESTED. - average_prop_good = sum([posture.prop_good for posture in recent_posture_data]) / len(recent_posture_data) + average_prop_good = sum( + [posture.prop_good for posture in recent_posture_data] + ) / len(recent_posture_data) if average_prop_good >= CUSHION_PROPORTION_GOOD_THRESHOLD: print(" Exiting handle_cushion_feedback() early: You sat well :)") auspost.set_last_cushion_time(datetime.now()) @@ -494,7 +527,8 @@ def handle_cushion_feedback(auspost : ControlledData) -> bool: auspost.set_last_cushion_time(datetime.now()) return True -def handle_plant_feedback(auspost : ControlledData) -> bool: + +def handle_plant_feedback(auspost: ControlledData) -> bool: """ Set the plant height according to short-term current session data, and update the timestamp of when plant feedback was last given. @@ -507,7 +541,7 @@ def handle_plant_feedback(auspost : ControlledData) -> bool: ! auspost.is_failed() Ensures: ! auspost.is_failed() - + TODO: Implement this method. Currently prints a debug statement and updates the time. """ # DEBUG: @@ -516,7 +550,8 @@ def handle_plant_feedback(auspost : ControlledData) -> bool: auspost.set_last_plant_time(datetime.now()) return True -def handle_sniff_feedback(auspost : ControlledData) -> bool: + +def handle_sniff_feedback(auspost: ControlledData) -> bool: """ Dispense olfactory reward (if necessary), and update the timestamp of when olfactory feedback was last given. @@ -529,7 +564,7 @@ def handle_sniff_feedback(auspost : ControlledData) -> bool: ! auspost.is_failed() Ensures: ! auspost.is_failed() - + TODO: Implement this method. Currently prints a debug statement and updates the time. """ # DEBUG: @@ -539,7 +574,6 @@ def handle_sniff_feedback(auspost : ControlledData) -> bool: return True - ## LAUNCH if __name__ == "__main__":