From 844cb17eee6fa8c0df401eec923ebcd82db96572 Mon Sep 17 00:00:00 2001 From: MitchellJC <81349046+MitchellJC@users.noreply.github.com> Date: Fri, 4 Oct 2024 18:31:18 +1000 Subject: [PATCH 01/11] feat: added function for getting next user id --- client/data/routines.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/client/data/routines.py b/client/data/routines.py index 7aab146..1535027 100644 --- a/client/data/routines.py +++ b/client/data/routines.py @@ -93,6 +93,19 @@ def create_user() -> int: return user_id +def next_user_id() -> int: + """ + Returns: + The id that would be assigned to a new user if one was created + """ + with _connect() as connection: + cursor = connection.cursor() + result = cursor.execute("SELECT last_insert_rowid() FROM user;") + ids = result.fetchone() + last_user_id = 0 if len(ids) == 0 else ids[0] + return last_user_id + 1 + + def save_posture(posture: Posture) -> None: """Stores the posture record in the database. From 088aea7bc755565895b26f8c3324c2844757b13d Mon Sep 17 00:00:00 2001 From: MitchellJC <81349046+MitchellJC@users.noreply.github.com> Date: Fri, 4 Oct 2024 18:31:51 +1000 Subject: [PATCH 02/11] feat: implemented login system driver --- client/drivers/login_system.py | 135 +++++++++++++++++++++++++++++++-- 1 file changed, 129 insertions(+), 6 deletions(-) diff --git a/client/drivers/login_system.py b/client/drivers/login_system.py index 386e3ca..9c047e8 100644 --- a/client/drivers/login_system.py +++ b/client/drivers/login_system.py @@ -2,14 +2,137 @@ Login system """ +import numpy as np -def handle_authentication() -> int: - pass +from data.routines import next_user_id, create_user +from models.pose_detection.frame_capturer import RaspCapturer +from models.face_recognition.recognition import register_faces, get_face_match, Status +from drivers.data_structures import HardwareComponents, LEFT_BUTTON, RIGHT_BUTTON +NUM_FACES = 5 +QUIT = -4 +BAD_STATUS_MESSAGES = { + Status.NO_FACES.value: "No face detected please try again", + Status.TOO_MANY_FACES.value: "Too many faces detected please try again", +} +QUIT_INSTRUCTIONS = "Right button to quit" -def handle_register() -> int: - pass +def handle_authentication(hardware: HardwareComponents) -> int: + """Run authentication loop until user either registers or logs in. -def handle_login() -> int: - pass + Args: + hardware: connected RPI hardware + + Returns: + id of logged in user. + """ + while True: + hardware.send_message("Left button to login\nRight button to register") + button = hardware.wait_for_button_press() + + if button == RIGHT_BUTTON: + status = handle_register(hardware) + + if button == LEFT_BUTTON: + status = handle_login(hardware) + + if _is_status_id(status): + return status + + if status != QUIT: + error = ValueError(f"Did not expect status: {status}") + hardware.send_message(str(error)) + raise error + + +def handle_register(hardware: HardwareComponents) -> int: + """Tries to register a new user until successful. + + Args: + hardware: Connected RPI hardware + + Returns: + User id of new registered user or -4 if the user quits + """ + while True: + status = _attempt_register(hardware) + + if status == QUIT: + return QUIT + + if _is_status_id(status): + return status + + +def handle_login(hardware: HardwareComponents) -> int: + """Tries to login a new user until successful. + + Args: + hardware: Connected RPI hardware + + Returns: + User id of logged in user, -4 if the user quits. + """ + while True: + status = _attempt_login(hardware) + + if status == QUIT: + return QUIT + + if _is_status_id(status): + return status + + +def _attempt_login(hardware: HardwareComponents) -> int: + capturer = RaspCapturer() + hardware.send_message(f"Press left button to take photo\n{QUIT_INSTRUCTIONS}") + + button_pressed = hardware.wait_for_button_press() + if button_pressed == LEFT_BUTTON: + face, _ = capturer.get_frame() + if button_pressed == RIGHT_BUTTON: + return QUIT + + status = get_face_match(face) + _send_bad_status(hardware, status) + + return status + + +def _attempt_register(hardware: HardwareComponents) -> int: + capturer = RaspCapturer() + faces: list[np.ndarray] = [] + for i in range(NUM_FACES): + hardware.send_message( + f"Press left button to take photo {i + 1}/{NUM_FACES}\n{QUIT_INSTRUCTIONS}" + ) + + button_pressed = hardware.wait_for_button_press() + if button_pressed == RIGHT_BUTTON: + return QUIT + if button_pressed == LEFT_BUTTON: + frame, _ = capturer.get_frame() + faces.append(frame) + + hardware.send_message("Attempting register...") + user_id = next_user_id() + status = register_faces(user_id, faces) + + if status == Status.OK.value: + create_user() + hardware.send_message("Registration successful!") + return user_id + + _send_bad_status(hardware, status) + + return status + + +def _is_status_id(status: int) -> bool: + return status > 0 + + +def _send_bad_status(hardware: HardwareComponents, status: int) -> None: + if status in BAD_STATUS_MESSAGES: + hardware.send_message(BAD_STATUS_MESSAGES[status]) From 0a844422696c00fff420f2f17a86318f4f949521 Mon Sep 17 00:00:00 2001 From: MitchellJC <81349046+MitchellJC@users.noreply.github.com> Date: Fri, 4 Oct 2024 18:33:31 +1000 Subject: [PATCH 03/11] feat,style: added functionality for waiting for button press Format changes from running black on file --- client/drivers/data_structures.py | 268 ++++++++++++++++++------------ 1 file changed, 162 insertions(+), 106 deletions(-) diff --git a/client/drivers/data_structures.py b/client/drivers/data_structures.py index 5955d99..05b1945 100644 --- a/client/drivers/data_structures.py +++ b/client/drivers/data_structures.py @@ -4,11 +4,11 @@ File: sitting-desktop-garden/client/drivers/data_structures.py Author: - Gabriel Field (47484306) + Gabriel Field (47484306), Mitchell Clark """ ## SECTION: Imports - +import time from typing import List from PiicoDev_Switch import PiicoDev_Switch from PiicoDev_SSD1306 import * @@ -18,6 +18,7 @@ # DEBUG: from math import sin, pi + # :DEBUG @@ -26,10 +27,13 @@ """ Sentinel value for an invalid user. """ EMPTY_USER_ID = -1 +LEFT_BUTTON = 0 +RIGHT_BUTTON = 1 ## SECTION: ControlledData + class ControlledData: """ Data for passing around in client/drivers/main.do_everything(). @@ -37,7 +41,7 @@ class ControlledData: There should only ever be one object of this class at a time. Member fields: - self._failed : bool + self._failed : bool True if this data is incomplete. self._user_id : int ID of current user. @@ -57,19 +61,19 @@ class ControlledData: self._failed ==> (all other variables are default values) """ - _failed : bool + _failed: bool """True if this data is incomplete.""" - _user_id : int + _user_id: int """ID of current user.""" - _posture_data : Queue[float] + _posture_data: Queue[float] """Data updated through ML models, used for feedback.""" - _last_snapshot_time : datetime + _last_snapshot_time: datetime """Time of the last successful pull of posture data from the SQLite database""" - _last_cushion_time : datetime + _last_cushion_time: datetime """Time of the last successful cushion feedback event.""" - _last_plant_time : datetime + _last_plant_time: datetime """Time of the last successful plant feedback event.""" - _last_sniff_time : datetime + _last_sniff_time: datetime """Time of the last successful scent feedback event.""" # SECTION: Constructors @@ -78,32 +82,34 @@ def __init__(self): """ DO NOT USE THIS CONSTRUCTOR! Call ControlledData.make_empty() or ControlledData.make_failed() instead. """ - self._failed = True - self._user_id = EMPTY_USER_ID - self._posture_data = Queue() + self._failed = True + self._user_id = EMPTY_USER_ID + self._posture_data = Queue() self._last_snapshot_time = datetime.now() - self._last_cushion_time = datetime.now() - self._last_plant_time = datetime.now() - self._last_sniff_time = datetime.now() + self._last_cushion_time = datetime.now() + self._last_plant_time = datetime.now() + self._last_sniff_time = datetime.now() self._DEBUG_current_graph_list_index = 0 - self._DEBUG_current_graph_function = lambda x: 30 * (1 + sin(2 * pi * x / WIDTH)) + self._DEBUG_current_graph_function = lambda x: 30 * ( + 1 + sin(2 * pi * x / WIDTH) + ) @classmethod - def make_empty(cls, user_id : int) -> "ControlledData": + def make_empty(cls, user_id: int) -> "ControlledData": """ Construct a non-failed object of this class, with a provided user ID and empty posture data. - + Returns: (ControlledData): An object of this class that is not failed, with legal user ID and empty posture data. """ return_me = ControlledData() - return_me._failed = False - return_me._user_id = user_id - return_me._posture_data = Queue() + return_me._failed = False + return_me._user_id = user_id + return_me._posture_data = Queue() return_me._last_snapshot_time = datetime.now() - return_me._last_cushion_time = datetime.now() - return_me._last_plant_time = datetime.now() - return_me._last_sniff_time = datetime.now() + return_me._last_cushion_time = datetime.now() + return_me._last_plant_time = datetime.now() + return_me._last_sniff_time = datetime.now() print(" Made a new empty ControlledData() with user_id", return_me._user_id) return return_me @@ -111,18 +117,18 @@ def make_empty(cls, user_id : int) -> "ControlledData": def make_failed(cls) -> "ControlledData": """ Construct a failed object of this class. - + Returns: (ControlledData): An object of this class that is failed. """ return_me = ControlledData() - return_me._failed = True - return_me._user_id = EMPTY_USER_ID - return_me._posture_data = Queue() + return_me._failed = True + return_me._user_id = EMPTY_USER_ID + return_me._posture_data = Queue() return_me._last_snapshot_time = datetime.now() - return_me._last_cushion_time = datetime.now() - return_me._last_plant_time = datetime.now() - return_me._last_sniff_time = datetime.now() + return_me._last_cushion_time = datetime.now() + return_me._last_plant_time = datetime.now() + return_me._last_sniff_time = datetime.now() return return_me # SECTION: Getters/Setters @@ -136,7 +142,9 @@ def DEBUG_get_next_posture_graph_value(self) -> int: TODO: Remove this method """ - return_me = self._DEBUG_current_graph_function(self._DEBUG_current_graph_list_index) + return_me = self._DEBUG_current_graph_function( + self._DEBUG_current_graph_list_index + ) self._DEBUG_current_graph_list_index += 1 return return_me @@ -146,7 +154,7 @@ def is_failed(self) -> bool: (bool): True iff this ControlledData is failed. """ return self._failed - + def get_user_id(self) -> int: """ Returns: @@ -167,8 +175,8 @@ def get_last_snapshot_time(self) -> datetime: (datetime): The last time that the internal posture data was updated. """ return self._last_snapshot_time - - def set_last_snapshot_time(self, time : datetime) -> None: + + def set_last_snapshot_time(self, time: datetime) -> None: """ Args: time : datetime @@ -183,14 +191,14 @@ def get_last_cushion_time(self) -> datetime: """ return self._last_cushion_time - def set_last_cushion_time(self, time : datetime) -> None: + def set_last_cushion_time(self, time: datetime) -> None: """ Args: time : datetime The last time that the user was provided cushion feedback. """ self._last_cushion_time = time - + def get_last_plant_time(self) -> datetime: """ Returns: @@ -198,14 +206,14 @@ def get_last_plant_time(self) -> datetime: """ return self._last_plant_time - def set_last_plant_time(self, time : datetime) -> None: + def set_last_plant_time(self, time: datetime) -> None: """ Args: time : datetime The last time that the user was provided plant feedback. """ self._last_plant_time = time - + def get_last_sniff_time(self) -> datetime: """ Returns: @@ -213,22 +221,24 @@ def get_last_sniff_time(self) -> datetime: """ return self._last_cushion_time - def set_last_sniff_time(self, time : datetime) -> None: + def set_last_sniff_time(self, time: datetime) -> None: """ Args: time : datetime The last time that the user was provided olfactory feedback. """ self._last_sniff_time = time - - def accept_new_posture_data(self, posture_data : List[float]) -> None: # TODO: Refine type signature + + def accept_new_posture_data( + self, posture_data: List[float] + ) -> None: # TODO: Refine type signature """ Update the internal store of posture data for the OLED display. Args: posture_data : List[float] New posture data to accept and merge with the current state of this object. - + TODO: Implement me! """ # DEBUG: @@ -237,10 +247,11 @@ def accept_new_posture_data(self, posture_data : List[float]) -> None: # TODO: R for datum in posture_data: self._posture_data.put_nowait(datum) - # SECTION: Posture data mapping - - def get_cushion_posture_data(self) -> "CUSHION_POSTURE_DATA": # TODO: Decide what this type looks like + + def get_cushion_posture_data( + self, + ) -> "CUSHION_POSTURE_DATA": # TODO: Decide what this type looks like """ Returns: (CUSHION_POSTURE_DATA): Posture data necessary for cushion feedback. @@ -251,7 +262,9 @@ def get_cushion_posture_data(self) -> "CUSHION_POSTURE_DATA": # TODO: Decide wha # :DEBUG return None - def get_plant_posture_data(self) -> "PLANT_POSTURE_DATA": # TODO: Decide what this type looks like + def get_plant_posture_data( + self, + ) -> "PLANT_POSTURE_DATA": # TODO: Decide what this type looks like """ Returns: (PLANT_POSTURE_DATA) Posture data necessary for plant feedback. @@ -261,8 +274,10 @@ def get_plant_posture_data(self) -> "PLANT_POSTURE_DATA": # TODO: Decide what th print(" WARNING: get_plant_posture_data() not implemented!") # :DEBUG return None - - def get_sniff_posture_data(self) -> "SNIFF_POSTURE_DATA": # TODO: Decide what this type looks like + + def get_sniff_posture_data( + self, + ) -> "SNIFF_POSTURE_DATA": # TODO: Decide what this type looks like """ Returns: (SNIFF_POSTURE_DATA): Posture data necessary for scent feedback. @@ -274,9 +289,9 @@ def get_sniff_posture_data(self) -> "SNIFF_POSTURE_DATA": # TODO: Decide what th return None - ## SECTION: Hardware packaged together + class HardwareComponents: """ Hardware components packaged together into a class. @@ -293,46 +308,50 @@ class HardwareComponents: Should get initialised ONCE THE USER IS LOGGED IN because the graph will look different for each user. """ - button0 : PiicoDev_Switch + button0: PiicoDev_Switch """A button with address switches set to [0, 0, 0, 0]""" - button1 : PiicoDev_Switch + button1: PiicoDev_Switch """A button with address switches set to [0, 0, 0, 1]""" - display : PiicoDev_SSD1306 + display: PiicoDev_SSD1306 """OLED SSD1306 Display with default address""" - posture_graph : PiicoDev_SSD1306.graph2D | None + posture_graph: PiicoDev_SSD1306.graph2D | None """ Graph object for rendering on self.display. NOT INITIALISED by default; i.e. None until initialised. Should get initialised ONCE THE USER IS LOGGED IN because the graph will look different for each user. """ - posture_graph_from : int | None + posture_graph_from: int | None """y-coordinate from which the posture graph begins, or `None` if no posture graph is active.""" # SECTION: Constructors - + @classmethod def make_fresh(cls): """ Create a new instance of HardwareComponents, set up according to the hardware that we expect to be plugged in. """ - DOUBLE_PRESS_DURATION = 400 # Milliseconds + DOUBLE_PRESS_DURATION = 400 # Milliseconds return HardwareComponents( - PiicoDev_Switch(id = [0, 0, 0, 0], double_press_duration = DOUBLE_PRESS_DURATION), # WARNING: 2024-09-01 17:12 Gabe: I think this produces an "I2C is not enabled" warning. No idea why. - PiicoDev_Switch(id = [0, 0, 0, 1], double_press_duration = DOUBLE_PRESS_DURATION), # WARNING: 2024-09-01 17:12 Gabe: I think this produces an "I2C is not enabled" warning. No idea why. - create_PiicoDev_SSD1306() # This is the constructor; ignore the "is not defined" error message. + PiicoDev_Switch( + id=[0, 0, 0, 0], double_press_duration=DOUBLE_PRESS_DURATION + ), # WARNING: 2024-09-01 17:12 Gabe: I think this produces an "I2C is not enabled" warning. No idea why. + PiicoDev_Switch( + id=[0, 0, 0, 1], double_press_duration=DOUBLE_PRESS_DURATION + ), # WARNING: 2024-09-01 17:12 Gabe: I think this produces an "I2C is not enabled" warning. No idea why. + create_PiicoDev_SSD1306(), # This is the constructor; ignore the "is not defined" error message. ) def __init__(self, button0, button1, display): - self.button0 : PiicoDev_Switch = button0 - self.button1 : PiicoDev_Switch = button1 - self.display : PiicoDev_SSD1306 = display - self.posture_graph : PiicoDev_SSD1306.graph2D | None = None - self.posture_graph_from : int | None = None - + self.button0: PiicoDev_Switch = button0 + self.button1: PiicoDev_Switch = button1 + self.display: PiicoDev_SSD1306 = display + self.posture_graph: PiicoDev_SSD1306.graph2D | None = None + self.posture_graph_from: int | None = None + # SECTION: Setters # 2024-09-02 14-45 Gabe: TESTED. - def get_control_messages(self, user_id : int) -> List[str]: + def get_control_messages(self, user_id: int) -> List[str]: """ Get messages to display during usual application loop. TODO: Finalise these! @@ -340,7 +359,7 @@ def get_control_messages(self, user_id : int) -> List[str]: Args: user_id : int ID of the currently logged-in user. - + Returns: (List[str]): The messages to display to the user during the main application loop. @@ -348,7 +367,7 @@ def get_control_messages(self, user_id : int) -> List[str]: return ["b0: logout", "id: " + str(user_id)] # 2024-09-13 08-31 Gabe: TESTED. - def initialise_posture_graph(self, user_id : int) -> None: + def initialise_posture_graph(self, user_id: int) -> None: """ Initialise self.posture_graph according to the provided user_id. @@ -358,31 +377,42 @@ def initialise_posture_graph(self, user_id : int) -> None: """ CONTROL_MESSAGES = self.get_control_messages(user_id) GRAPH_MIN_VALUE = 0 - GRAPH_MAX_VALUE = 60 # TODO: 2024-09-02 07-53 Gabe: - # This needs to be a real value for the underlying data that we expect to be shown. - # From memory, this is probably `60` for "number of the last 60 seconds spent sitting well" - LINE_HEIGHT = 15 # pixels - LINE_WIDTH = 16 # characters - + GRAPH_MAX_VALUE = 60 # TODO: 2024-09-02 07-53 Gabe: + # This needs to be a real value for the underlying data that we expect to be shown. + # From memory, this is probably `60` for "number of the last 60 seconds spent sitting well" + LINE_HEIGHT = 15 # pixels + LINE_WIDTH = 16 # characters + # The posture graph will occupy space from the bottom (y = HEIGHT - 1) up to initialisation_y_value. flatten = lambda xss: [x for xs in xss for x in xs] - it = flatten(map( - (lambda me: [me[i : i + LINE_WIDTH] for i in range(0, len(CONTROL_MESSAGES), LINE_WIDTH)]), - CONTROL_MESSAGES - )) + it = flatten( + map( + ( + lambda me: [ + me[i : i + LINE_WIDTH] + for i in range(0, len(CONTROL_MESSAGES), LINE_WIDTH) + ] + ), + CONTROL_MESSAGES, + ) + ) initialisation_y_value = len(it) * LINE_HEIGHT self.posture_graph_from = initialisation_y_value self.posture_graph = self.display.graph2D( - originX = 0, originY = HEIGHT - 1, - width = WIDTH, height = HEIGHT - initialisation_y_value, - minValue = GRAPH_MIN_VALUE, maxValue = GRAPH_MAX_VALUE, - c = 1, bars = False + originX=0, + originY=HEIGHT - 1, + width=WIDTH, + height=HEIGHT - initialisation_y_value, + minValue=GRAPH_MIN_VALUE, + maxValue=GRAPH_MAX_VALUE, + c=1, + bars=False, ) - + # SECTION: Using peripherals # 2024-09-01 16:57 Gabe: TESTED. - def oled_display_text(self, text : str, x : int, y : int, colour : int = 1) -> int: + def oled_display_text(self, text: str, x: int, y: int, colour: int = 1) -> int: """ Display text on the oled display, wrapping lines if necessary. NOTE: Does not blank display. Call `.display.fill(0)` if needed. @@ -399,19 +429,19 @@ def oled_display_text(self, text : str, x : int, y : int, colour : int = 1) -> i 0: black 1: white Defaults to 1: white. - + Returns: (int): The y-value at which any subsequent lines should start printing from. """ - LINE_HEIGHT = 15 # pixels - LINE_WIDTH = 16 # characters + LINE_HEIGHT = 15 # pixels + LINE_WIDTH = 16 # characters chunks = [text[i : i + LINE_WIDTH] for i in range(0, len(text), LINE_WIDTH)] - for (index, chunk) in enumerate(chunks): + for index, chunk in enumerate(chunks): self.display.text(chunk, x, y + index * LINE_HEIGHT, colour) return y + len(chunks) * LINE_HEIGHT - + # 2024-09-01 17:12 Gabe: TESTED. - def oled_display_texts(self, texts: List[str], x : int, y : int, colour : int) -> int: + def oled_display_texts(self, texts: List[str], x: int, y: int, colour: int) -> int: """ Display many lines of text on the oled display, wrapping lines if necessary. NOTE: Does not blank display. Call `.display.fill(0)` if needed. @@ -427,19 +457,45 @@ def oled_display_texts(self, texts: List[str], x : int, y : int, colour : int) - colour : int 0: black 1: white - + Returns: (int): The y-value at which any subsequent lines should start printing from. """ display_height_offset = 0 for text in texts: - display_height_offset = self.oled_display_text(text, x, y + display_height_offset, colour) + display_height_offset = self.oled_display_text( + text, x, y + display_height_offset, colour + ) return display_height_offset + def send_message(self, message: str, message_time: int = 1) -> None: + """Clear the screen and display message + + Args: + message: Message to send to the user + """ + self.display.fill(0) + self.oled_display_text(message, 0, 0, 1) + self.display.show() + time.sleep(message_time) + + def wait_for_button_press(self) -> int: + """Waits for a button to be pressed and then returns the button number. + + Returns: + The number of the button pressed. + """ + while True: + if self.button0.was_pressed: + return LEFT_BUTTON + + if self.button1.was_pressed: + return RIGHT_BUTTON ## SECTION: Picture + class Picture: """ A picture, which may have failed. @@ -452,8 +508,8 @@ class Picture: """ def __init__(self) -> "Picture": - self.failed : bool = True - self.underlying_picture : "UNDERLYING_PICTURE" = None + self.failed: bool = True + self.underlying_picture: "UNDERLYING_PICTURE" = None @classmethod def make_failed(cls) -> "Picture": @@ -464,9 +520,9 @@ def make_failed(cls) -> "Picture": return_me.failed = True return_me.underlying_picture = None return return_me - + @classmethod - def make_valid(cls, underlying_picture : "UNDERLYING_PICTURE") -> "Picture": + def make_valid(cls, underlying_picture: "UNDERLYING_PICTURE") -> "Picture": """ Make a valid Picture. """ @@ -476,9 +532,9 @@ def make_valid(cls, underlying_picture : "UNDERLYING_PICTURE") -> "Picture": return return_me - ## SECTION: Face + class Face: """ A potentially recognised face, which may have failed to match or failed to run. @@ -492,12 +548,12 @@ class Face: if not self.failed and self.matched, then this string will contain the user id of the matched user """ - + def __init__(self) -> "Face": - self.failed : bool = True - self.matched : bool = False - self.user_id : int = None - + self.failed: bool = True + self.matched: bool = False + self.user_id: int = None + @classmethod def make_failed(cls) -> "Face": """ @@ -508,7 +564,7 @@ def make_failed(cls) -> "Face": return_me.matched = False return_me.user_id = None return return_me - + @classmethod def make_unmatched(cls) -> "Face": """ @@ -521,7 +577,7 @@ def make_unmatched(cls) -> "Face": return return_me @classmethod - def make_matched(cls, user_id : int) -> "Face": + def make_matched(cls, user_id: int) -> "Face": """ Make a matched face. From 321aef7c46044322c2a5e96e96288f0bd7fdfb3d Mon Sep 17 00:00:00 2001 From: Mitchell Clark Date: Fri, 4 Oct 2024 18:53:46 +1000 Subject: [PATCH 04/11] feat: added bad status message for no face match --- client/drivers/login_system.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/client/drivers/login_system.py b/client/drivers/login_system.py index 9c047e8..897d8b5 100644 --- a/client/drivers/login_system.py +++ b/client/drivers/login_system.py @@ -12,8 +12,9 @@ NUM_FACES = 5 QUIT = -4 BAD_STATUS_MESSAGES = { - Status.NO_FACES.value: "No face detected please try again", - Status.TOO_MANY_FACES.value: "Too many faces detected please try again", + Status.NO_FACES.value: "No face detected please", + Status.TOO_MANY_FACES.value: "Too many faces detected", + Status.NO_MATCH.value: "Could not match face", } QUIT_INSTRUCTIONS = "Right button to quit" From 03cf80627f53b8b9b2732fdf0a47142d933543ef Mon Sep 17 00:00:00 2001 From: Mitchell Clark Date: Fri, 4 Oct 2024 18:55:46 +1000 Subject: [PATCH 05/11] doc: added msg time arg in send_message docstring --- client/drivers/data_structures.py | 1 + 1 file changed, 1 insertion(+) diff --git a/client/drivers/data_structures.py b/client/drivers/data_structures.py index 05b1945..0ad84d4 100644 --- a/client/drivers/data_structures.py +++ b/client/drivers/data_structures.py @@ -473,6 +473,7 @@ def send_message(self, message: str, message_time: int = 1) -> None: Args: message: Message to send to the user + message_time: Time to sleep for after displaying message. """ self.display.fill(0) self.oled_display_text(message, 0, 0, 1) From e7c1248b388e81435af4e509908fecc5bdac0a97 Mon Sep 17 00:00:00 2001 From: Mitchell Clark Date: Sat, 5 Oct 2024 12:44:05 +1000 Subject: [PATCH 06/11] ref: Refactored login handles to _loop_action --- client/drivers/login_system.py | 56 ++++++++++++---------------------- 1 file changed, 20 insertions(+), 36 deletions(-) diff --git a/client/drivers/login_system.py b/client/drivers/login_system.py index 897d8b5..eca46a5 100644 --- a/client/drivers/login_system.py +++ b/client/drivers/login_system.py @@ -2,6 +2,8 @@ Login system """ +from typing import Callable + import numpy as np from data.routines import next_user_id, create_user @@ -18,6 +20,8 @@ } QUIT_INSTRUCTIONS = "Right button to quit" +Action = Callable[[HardwareComponents], int] + def handle_authentication(hardware: HardwareComponents) -> int: """Run authentication loop until user either registers or logs in. @@ -33,10 +37,10 @@ def handle_authentication(hardware: HardwareComponents) -> int: button = hardware.wait_for_button_press() if button == RIGHT_BUTTON: - status = handle_register(hardware) + status = _loop_action(hardware, _attempt_register) if button == LEFT_BUTTON: - status = handle_login(hardware) + status = _loop_action(hardware, _attempt_login) if _is_status_id(status): return status @@ -47,36 +51,10 @@ def handle_authentication(hardware: HardwareComponents) -> int: raise error -def handle_register(hardware: HardwareComponents) -> int: - """Tries to register a new user until successful. - - Args: - hardware: Connected RPI hardware - - Returns: - User id of new registered user or -4 if the user quits - """ - while True: - status = _attempt_register(hardware) - - if status == QUIT: - return QUIT - - if _is_status_id(status): - return status - - -def handle_login(hardware: HardwareComponents) -> int: - """Tries to login a new user until successful. - - Args: - hardware: Connected RPI hardware - - Returns: - User id of logged in user, -4 if the user quits. - """ +def _loop_action(hardware: HardwareComponents, action: Action) -> int: + """Loop action until appropriate status is returned""" while True: - status = _attempt_login(hardware) + status = action(hardware) if status == QUIT: return QUIT @@ -92,31 +70,37 @@ def _attempt_login(hardware: HardwareComponents) -> int: button_pressed = hardware.wait_for_button_press() if button_pressed == LEFT_BUTTON: face, _ = capturer.get_frame() + if button_pressed == RIGHT_BUTTON: return QUIT status = get_face_match(face) - _send_bad_status(hardware, status) + _handle_status_message(hardware, status) return status def _attempt_register(hardware: HardwareComponents) -> int: capturer = RaspCapturer() + + # Capture NUM_FACES faces faces: list[np.ndarray] = [] for i in range(NUM_FACES): hardware.send_message( - f"Press left button to take photo {i + 1}/{NUM_FACES}\n{QUIT_INSTRUCTIONS}" + f"Press left button to take photo {i + 1}/{NUM_FACES}\n" + f"{QUIT_INSTRUCTIONS}" ) button_pressed = hardware.wait_for_button_press() if button_pressed == RIGHT_BUTTON: return QUIT + if button_pressed == LEFT_BUTTON: frame, _ = capturer.get_frame() faces.append(frame) - hardware.send_message("Attempting register...") + # Try register faces + hardware.send_message("Registering face...") user_id = next_user_id() status = register_faces(user_id, faces) @@ -125,7 +109,7 @@ def _attempt_register(hardware: HardwareComponents) -> int: hardware.send_message("Registration successful!") return user_id - _send_bad_status(hardware, status) + _handle_status_message(hardware, status) return status @@ -134,6 +118,6 @@ def _is_status_id(status: int) -> bool: return status > 0 -def _send_bad_status(hardware: HardwareComponents, status: int) -> None: +def _handle_status_message(hardware: HardwareComponents, status: int) -> None: if status in BAD_STATUS_MESSAGES: hardware.send_message(BAD_STATUS_MESSAGES[status]) From c717dcabf689a5aee0d2b1ddd2ba95b113e38b72 Mon Sep 17 00:00:00 2001 From: Mitchell Clark Date: Sat, 5 Oct 2024 13:00:01 +1000 Subject: [PATCH 07/11] ref: Integrated new login system into main --- client/drivers/main.py | 200 +++-------------------------------------- 1 file changed, 12 insertions(+), 188 deletions(-) diff --git a/client/drivers/main.py b/client/drivers/main.py index 43112fe..238fc05 100644 --- a/client/drivers/main.py +++ b/client/drivers/main.py @@ -4,11 +4,11 @@ File: sitting-desktop-garden/client/drivers/main.py Author: - Gabriel Field (47484306) + Gabriel Field (47484306), Mitchell Clark """ ## SECTION: Imports - +import logging from PiicoDev_Unified import sleep_ms from PiicoDev_Switch import * @@ -16,13 +16,12 @@ import RPi.GPIO as GPIO -from typing import Tuple -import threading from datetime import datetime, timedelta -from data_structures import ControlledData, HardwareComponents, Picture, Face +from drivers.data_structures import ControlledData, HardwareComponents, Picture from ai_bros import * from data.routines import * +from drivers.login_system import handle_authentication ## SECTION: Global constants @@ -70,6 +69,7 @@ DEBUG_DO_EVERYTHING_INTERVAL = 1000 """ DEBUG Number of milliseconds between each loop iteration in do_everything(). """ +logger = logging.getLogger(__name__) ## SECTION: main() @@ -78,23 +78,16 @@ def main(): """ Entry point for the control program. """ - # DEBUG: - print(" main()") - # :DEBUG - - global hardware - hardware = initialise_hardware() + logging.basicConfig(level=logging.DEBUG) + logger.debug("Running main") init_database() - # Top level control flow while True: - wait_for_login_attempt() - main_data = attempt_login() - if main_data.is_failed(): - continue - print(" main(): Successful login") - do_everything(main_data) + user_id = handle_authentication(hardware) + user = ControlledData.make_empty(user_id) + logger.debug("Login successful") + do_everything(user) ## SECTION: Hardware initialisation @@ -129,176 +122,6 @@ def initialise_hardware() -> HardwareComponents: return return_me -## SECTION: Login handling - - -# 2024-09-01_15-52 Gabe: TESTED. -def wait_for_login_attempt() -> bool: - """ - Waits until the user attempts to log in. - - Returns: - (bool): True when the user attempts to log in. - """ - print(" BEGIN wait_for_login_attempt()") - - WAIT_FOR_LOGIN_OLED_MESSAGE = "Press button0 to log in!" - # Display to screen - hardware.display.fill(0) - hardware.oled_display_text(WAIT_FOR_LOGIN_OLED_MESSAGE, 0, 0, 1) - hardware.display.show() - # Clear button queue - hardware.button0.was_pressed - - while True: - if hardware.button0.was_pressed: - # Clear the display - hardware.display.fill(0) - hardware.display.show() - print(" END wait_for_login_attempt()") # DEBUG - return True - sleep_ms(WAIT_FOR_LOGIN_POLLING_INTERVAL) - - -# 2024-09-01 17:06 Gabe: TESTED., assuming ai_bros_face_recogniser() does what it should do. -def attempt_login() -> ControlledData: - """ - Attempts to log in. - - Returns: - (ControlledData): which is: - FAILED if the login is unsuccessful - EMPTY (but not failed) if the login is successful - """ - - # TODO: Finalise these messages - SMILE_FOR_CAMERA_MESSAGE = "LIS: Smile for the camera!" - PICTURE_FAILED_MESSAGE = "LIS: Picture failed T-T" - AI_FAILED_MESSAGE = "LIS: Failed to determine user T-T" - LOGIN_TOTALLY_FAILED_MESSAGE = "LIS: Failed to log in T-T" - - hardware.display.fill(0) - hardware.oled_display_text(SMILE_FOR_CAMERA_MESSAGE, 0, 0, 1) - hardware.display.show() - sleep_ms(START_LOGIN_ATTEMPTS_DELAY) - - while True: - hardware.display.fill(0) - hardware.oled_display_text(SMILE_FOR_CAMERA_MESSAGE, 0, 0, 1) - hardware.display.show() - picture = take_picture() - if picture.failed: - print(" Picture Failed") # DEBUG - hardware.display.fill(0) - hardware.oled_display_text(PICTURE_FAILED_MESSAGE, 0, 0, 1) - hardware.display.show() - sleep_ms(LOGIN_TAKE_PICTURE_INTERVAL) - continue - face = ai_bros_face_recogniser( - picture.underlying_picture - ) # TODO: This should be an external API call. - if face.failed: - print(" AI has failed us") # DEBUG - hardware.display.fill(0) - hardware.oled_display_text(AI_FAILED_MESSAGE, 0, 0, 1) - hardware.display.show() - sleep_ms(LOGIN_TAKE_PICTURE_INTERVAL) - continue - if face.matched: - print(" Mega W for AI") # DEBUG - return ControlledData.make_empty(face.user_id) - elif ask_create_new_user(): - return ControlledData.make_empty( - create_new_user(picture.underlying_picture) - ) - # Tell the user the login failed - print(" attempt_login(): Totally failed lol") # DEBUG - hardware.display.fill(0) - hardware.oled_display_text(LOGIN_TOTALLY_FAILED_MESSAGE, 0, 0, 1) - hardware.display.show() - sleep_ms(FAIL_LOGIN_DELAY) - return ControlledData.make_failed() - - -def take_picture() -> Picture: - """ - Takes a picture from the camera, and returns a (failable) picture object. - - TODO: Actually write this function. Currently prints a debug message and returns a failed picture. - """ - # DEBUG: - print(" take_picture()") - DEBUG_return_value = Picture.make_valid("DEBUG_picture_goes_here") - # :DEBUG - return DEBUG_return_value - - -# 2024-09-01 17:06 Gabe: TESTED. -def ask_create_new_user() -> bool: - """ - Ask the user whether they would like to create a new user profile based on the previous picture. - - Returns: - (bool): True iff the user would like to create a new user profile - TODO: Make this go out to hardware peripherals. It should have: - Two buttons (yes / no) - The LED display ("Unmatched face. Create new user?") - """ - print(" BEGIN ask_create_new_user()") - - CREATE_NEW_USER_MESSAGES = [ - "No face matched.", - "Create new user?", - "button0: no", - "button1: yes", - ] - # Display to screen - hardware.display.fill(0) - hardware.oled_display_texts(CREATE_NEW_USER_MESSAGES, 0, 0, 1) - hardware.display.show() - # Clear button queue - hardware.button0.was_pressed - hardware.button1.was_pressed - - while True: - if hardware.button0.was_pressed: - # Clear the display - hardware.display.fill(0) - hardware.display.show() - print(" END ask_create_new_user(): do NOT create new user") # DEBUG - return False - if hardware.button1.was_pressed: - # Clear the display - hardware.display.fill(0) - hardware.display.show() - print(" END ask_create_new_user(): DO create new user") # DEBUG - return True - sleep_ms(ASK_CREATE_NEW_USER_POLLING_INTERVAL) - - -def create_new_user(underlying_picture: int) -> int: - """ - Create a new user based on the given picture, and return their user id. - - Args: - underlying_picture : UNDERLYING_PICTURE - Picture to associate with the new user profile - Returns: - (int): The new user's user id - TODO: Actually write to the local SQLite database, and properly determine the new user id. - """ - # DEBUG: - DEBUG_new_user_id = 0 - # new_user_id = create_user() - # try: - # register_faces(new_user_id, [underlying_picture]) - # except NotImplementedError: - # pass - # :DEBUG - return DEBUG_new_user_id - # return new_user_id # DEBUG - - ## SECTION: Control for the logged-in user @@ -575,6 +398,7 @@ def handle_sniff_feedback(auspost: ControlledData) -> bool: ## LAUNCH +hardware = initialise_hardware() if __name__ == "__main__": main() From c8a10c064d39b7eb998b844ccecee9fdcc1f74b9 Mon Sep 17 00:00:00 2001 From: Mitchell Clark Date: Sat, 5 Oct 2024 13:27:39 +1000 Subject: [PATCH 08/11] fix,feat: added clear buttons to stop premature actions from user --- client/drivers/data_structures.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/client/drivers/data_structures.py b/client/drivers/data_structures.py index 0ad84d4..d19324e 100644 --- a/client/drivers/data_structures.py +++ b/client/drivers/data_structures.py @@ -486,6 +486,7 @@ def wait_for_button_press(self) -> int: Returns: The number of the button pressed. """ + self._clear_buttons() while True: if self.button0.was_pressed: return LEFT_BUTTON @@ -493,6 +494,12 @@ def wait_for_button_press(self) -> int: if self.button1.was_pressed: return RIGHT_BUTTON + def _clear_buttons(self) -> None: + self.button0.was_pressed + self.button1.was_pressed + self.button0.was_double_pressed + self.button1.was_double_pressed + ## SECTION: Picture From 016c5cf7e51f45d01010ad4d0ade4d5293995004 Mon Sep 17 00:00:00 2001 From: Mitchell Clark Date: Sun, 6 Oct 2024 10:39:34 +1100 Subject: [PATCH 09/11] ref: deleted Picture and Face classes --- client/drivers/data_structures.py | 99 ------------------------------- client/drivers/main.py | 2 +- 2 files changed, 1 insertion(+), 100 deletions(-) diff --git a/client/drivers/data_structures.py b/client/drivers/data_structures.py index d19324e..07e4354 100644 --- a/client/drivers/data_structures.py +++ b/client/drivers/data_structures.py @@ -499,102 +499,3 @@ def _clear_buttons(self) -> None: self.button1.was_pressed self.button0.was_double_pressed self.button1.was_double_pressed - - -## SECTION: Picture - - -class Picture: - """ - A picture, which may have failed. - - Member fields: - self.failed : bool - True iff this picture is incomplete. - self.underlying_picture : (TODO: Figure out this type!) - The picture encoded by this object. - """ - - def __init__(self) -> "Picture": - self.failed: bool = True - self.underlying_picture: "UNDERLYING_PICTURE" = None - - @classmethod - def make_failed(cls) -> "Picture": - """ - Make a failed Picture. - """ - return_me = Picture() - return_me.failed = True - return_me.underlying_picture = None - return return_me - - @classmethod - def make_valid(cls, underlying_picture: "UNDERLYING_PICTURE") -> "Picture": - """ - Make a valid Picture. - """ - return_me = Picture() - return_me.failed = False - return_me.underlying_picture = underlying_picture - return return_me - - -## SECTION: Face - - -class Face: - """ - A potentially recognised face, which may have failed to match or failed to run. - - Member fields: - self.failed : bool - True iff the facial recognition model failed - self.matched : bool - True iff the facial recognition model matched a face - self.user_id : int - if not self.failed and self.matched, then this string will contain the user - id of the matched user - """ - - def __init__(self) -> "Face": - self.failed: bool = True - self.matched: bool = False - self.user_id: int = None - - @classmethod - def make_failed(cls) -> "Face": - """ - Make a failed Face. - """ - return_me = Face() - return_me.failed = True - return_me.matched = False - return_me.user_id = None - return return_me - - @classmethod - def make_unmatched(cls) -> "Face": - """ - Make an unmatched Face. - """ - return_me = Face() - return_me.failed = False - return_me.matched = False - return_me.user_id = None - return return_me - - @classmethod - def make_matched(cls, user_id: int) -> "Face": - """ - Make a matched face. - - Args: - user_id : int - The matched user id - """ - return_me = Face() - return_me.failed = False - return_me.matched = True - return_me.user_id = user_id - return return_me diff --git a/client/drivers/main.py b/client/drivers/main.py index 238fc05..995b510 100644 --- a/client/drivers/main.py +++ b/client/drivers/main.py @@ -18,7 +18,7 @@ from datetime import datetime, timedelta -from drivers.data_structures import ControlledData, HardwareComponents, Picture +from drivers.data_structures import ControlledData, HardwareComponents from ai_bros import * from data.routines import * from drivers.login_system import handle_authentication From 4a034a20ba69f498a2ab68fa2da036dfd318046a Mon Sep 17 00:00:00 2001 From: Mitchell Clark Date: Sun, 6 Oct 2024 11:01:44 +1100 Subject: [PATCH 10/11] feat: added logging to login_system so that debugging is easier on rpi --- client/drivers/login_system.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/client/drivers/login_system.py b/client/drivers/login_system.py index eca46a5..76eb1fd 100644 --- a/client/drivers/login_system.py +++ b/client/drivers/login_system.py @@ -2,6 +2,7 @@ Login system """ +import logging from typing import Callable import numpy as np @@ -22,6 +23,8 @@ Action = Callable[[HardwareComponents], int] +logger = logging.getLogger(__name__) + def handle_authentication(hardware: HardwareComponents) -> int: """Run authentication loop until user either registers or logs in. @@ -33,7 +36,7 @@ def handle_authentication(hardware: HardwareComponents) -> int: id of logged in user. """ while True: - hardware.send_message("Left button to login\nRight button to register") + _log_and_send(hardware, "Left button to login\nRight button to register") button = hardware.wait_for_button_press() if button == RIGHT_BUTTON: @@ -65,7 +68,8 @@ def _loop_action(hardware: HardwareComponents, action: Action) -> int: def _attempt_login(hardware: HardwareComponents) -> int: capturer = RaspCapturer() - hardware.send_message(f"Press left button to take photo\n{QUIT_INSTRUCTIONS}") + message = f"Press left button to take photo\n{QUIT_INSTRUCTIONS}" + _log_and_send(hardware, message) button_pressed = hardware.wait_for_button_press() if button_pressed == LEFT_BUTTON: @@ -86,10 +90,11 @@ def _attempt_register(hardware: HardwareComponents) -> int: # Capture NUM_FACES faces faces: list[np.ndarray] = [] for i in range(NUM_FACES): - hardware.send_message( + message = ( f"Press left button to take photo {i + 1}/{NUM_FACES}\n" f"{QUIT_INSTRUCTIONS}" ) + _log_and_send(hardware, message) button_pressed = hardware.wait_for_button_press() if button_pressed == RIGHT_BUTTON: @@ -100,13 +105,13 @@ def _attempt_register(hardware: HardwareComponents) -> int: faces.append(frame) # Try register faces - hardware.send_message("Registering face...") + _log_and_send(hardware, "Registering...") user_id = next_user_id() status = register_faces(user_id, faces) if status == Status.OK.value: create_user() - hardware.send_message("Registration successful!") + _log_and_send(hardware, "Registration successful!") return user_id _handle_status_message(hardware, status) @@ -120,4 +125,9 @@ def _is_status_id(status: int) -> bool: def _handle_status_message(hardware: HardwareComponents, status: int) -> None: if status in BAD_STATUS_MESSAGES: - hardware.send_message(BAD_STATUS_MESSAGES[status]) + _log_and_send(hardware, BAD_STATUS_MESSAGES[status]) + + +def _log_and_send(hardware: HardwareComponents, message: str) -> None: + logger.debug(message) + hardware.send_message(message) From 2f7c87c9c7edf1a0972eeeab96fb683e2737e103 Mon Sep 17 00:00:00 2001 From: Mitchell Clark Date: Sun, 6 Oct 2024 12:47:43 +1100 Subject: [PATCH 11/11] doc,ref: More specific docstrings and init_hardware in main guard --- client/drivers/data_structures.py | 3 ++- client/drivers/main.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/client/drivers/data_structures.py b/client/drivers/data_structures.py index 07e4354..4919aa9 100644 --- a/client/drivers/data_structures.py +++ b/client/drivers/data_structures.py @@ -473,7 +473,7 @@ def send_message(self, message: str, message_time: int = 1) -> None: Args: message: Message to send to the user - message_time: Time to sleep for after displaying message. + message_time: Time (seconds) to sleep for after displaying message. """ self.display.fill(0) self.oled_display_text(message, 0, 0, 1) @@ -495,6 +495,7 @@ def wait_for_button_press(self) -> int: return RIGHT_BUTTON def _clear_buttons(self) -> None: + """Clear pressed status from all buttons.""" self.button0.was_pressed self.button1.was_pressed self.button0.was_double_pressed diff --git a/client/drivers/main.py b/client/drivers/main.py index 995b510..1118898 100644 --- a/client/drivers/main.py +++ b/client/drivers/main.py @@ -398,7 +398,7 @@ def handle_sniff_feedback(auspost: ControlledData) -> bool: ## LAUNCH -hardware = initialise_hardware() if __name__ == "__main__": + hardware = initialise_hardware() main()