diff --git a/client/data/routines.py b/client/data/routines.py index 7aab146..1535027 100644 --- a/client/data/routines.py +++ b/client/data/routines.py @@ -93,6 +93,19 @@ def create_user() -> int: return user_id +def next_user_id() -> int: + """ + Returns: + The id that would be assigned to a new user if one was created + """ + with _connect() as connection: + cursor = connection.cursor() + result = cursor.execute("SELECT last_insert_rowid() FROM user;") + ids = result.fetchone() + last_user_id = 0 if len(ids) == 0 else ids[0] + return last_user_id + 1 + + def save_posture(posture: Posture) -> None: """Stores the posture record in the database. diff --git a/client/drivers/data_structures.py b/client/drivers/data_structures.py index 5955d99..4919aa9 100644 --- a/client/drivers/data_structures.py +++ b/client/drivers/data_structures.py @@ -4,11 +4,11 @@ File: sitting-desktop-garden/client/drivers/data_structures.py Author: - Gabriel Field (47484306) + Gabriel Field (47484306), Mitchell Clark """ ## SECTION: Imports - +import time from typing import List from PiicoDev_Switch import PiicoDev_Switch from PiicoDev_SSD1306 import * @@ -18,6 +18,7 @@ # DEBUG: from math import sin, pi + # :DEBUG @@ -26,10 +27,13 @@ """ Sentinel value for an invalid user. """ EMPTY_USER_ID = -1 +LEFT_BUTTON = 0 +RIGHT_BUTTON = 1 ## SECTION: ControlledData + class ControlledData: """ Data for passing around in client/drivers/main.do_everything(). @@ -37,7 +41,7 @@ class ControlledData: There should only ever be one object of this class at a time. Member fields: - self._failed : bool + self._failed : bool True if this data is incomplete. self._user_id : int ID of current user. @@ -57,19 +61,19 @@ class ControlledData: self._failed ==> (all other variables are default values) """ - _failed : bool + _failed: bool """True if this data is incomplete.""" - _user_id : int + _user_id: int """ID of current user.""" - _posture_data : Queue[float] + _posture_data: Queue[float] """Data updated through ML models, used for feedback.""" - _last_snapshot_time : datetime + _last_snapshot_time: datetime """Time of the last successful pull of posture data from the SQLite database""" - _last_cushion_time : datetime + _last_cushion_time: datetime """Time of the last successful cushion feedback event.""" - _last_plant_time : datetime + _last_plant_time: datetime """Time of the last successful plant feedback event.""" - _last_sniff_time : datetime + _last_sniff_time: datetime """Time of the last successful scent feedback event.""" # SECTION: Constructors @@ -78,32 +82,34 @@ def __init__(self): """ DO NOT USE THIS CONSTRUCTOR! Call ControlledData.make_empty() or ControlledData.make_failed() instead. """ - self._failed = True - self._user_id = EMPTY_USER_ID - self._posture_data = Queue() + self._failed = True + self._user_id = EMPTY_USER_ID + self._posture_data = Queue() self._last_snapshot_time = datetime.now() - self._last_cushion_time = datetime.now() - self._last_plant_time = datetime.now() - self._last_sniff_time = datetime.now() + self._last_cushion_time = datetime.now() + self._last_plant_time = datetime.now() + self._last_sniff_time = datetime.now() self._DEBUG_current_graph_list_index = 0 - self._DEBUG_current_graph_function = lambda x: 30 * (1 + sin(2 * pi * x / WIDTH)) + self._DEBUG_current_graph_function = lambda x: 30 * ( + 1 + sin(2 * pi * x / WIDTH) + ) @classmethod - def make_empty(cls, user_id : int) -> "ControlledData": + def make_empty(cls, user_id: int) -> "ControlledData": """ Construct a non-failed object of this class, with a provided user ID and empty posture data. - + Returns: (ControlledData): An object of this class that is not failed, with legal user ID and empty posture data. """ return_me = ControlledData() - return_me._failed = False - return_me._user_id = user_id - return_me._posture_data = Queue() + return_me._failed = False + return_me._user_id = user_id + return_me._posture_data = Queue() return_me._last_snapshot_time = datetime.now() - return_me._last_cushion_time = datetime.now() - return_me._last_plant_time = datetime.now() - return_me._last_sniff_time = datetime.now() + return_me._last_cushion_time = datetime.now() + return_me._last_plant_time = datetime.now() + return_me._last_sniff_time = datetime.now() print(" Made a new empty ControlledData() with user_id", return_me._user_id) return return_me @@ -111,18 +117,18 @@ def make_empty(cls, user_id : int) -> "ControlledData": def make_failed(cls) -> "ControlledData": """ Construct a failed object of this class. - + Returns: (ControlledData): An object of this class that is failed. """ return_me = ControlledData() - return_me._failed = True - return_me._user_id = EMPTY_USER_ID - return_me._posture_data = Queue() + return_me._failed = True + return_me._user_id = EMPTY_USER_ID + return_me._posture_data = Queue() return_me._last_snapshot_time = datetime.now() - return_me._last_cushion_time = datetime.now() - return_me._last_plant_time = datetime.now() - return_me._last_sniff_time = datetime.now() + return_me._last_cushion_time = datetime.now() + return_me._last_plant_time = datetime.now() + return_me._last_sniff_time = datetime.now() return return_me # SECTION: Getters/Setters @@ -136,7 +142,9 @@ def DEBUG_get_next_posture_graph_value(self) -> int: TODO: Remove this method """ - return_me = self._DEBUG_current_graph_function(self._DEBUG_current_graph_list_index) + return_me = self._DEBUG_current_graph_function( + self._DEBUG_current_graph_list_index + ) self._DEBUG_current_graph_list_index += 1 return return_me @@ -146,7 +154,7 @@ def is_failed(self) -> bool: (bool): True iff this ControlledData is failed. """ return self._failed - + def get_user_id(self) -> int: """ Returns: @@ -167,8 +175,8 @@ def get_last_snapshot_time(self) -> datetime: (datetime): The last time that the internal posture data was updated. """ return self._last_snapshot_time - - def set_last_snapshot_time(self, time : datetime) -> None: + + def set_last_snapshot_time(self, time: datetime) -> None: """ Args: time : datetime @@ -183,14 +191,14 @@ def get_last_cushion_time(self) -> datetime: """ return self._last_cushion_time - def set_last_cushion_time(self, time : datetime) -> None: + def set_last_cushion_time(self, time: datetime) -> None: """ Args: time : datetime The last time that the user was provided cushion feedback. """ self._last_cushion_time = time - + def get_last_plant_time(self) -> datetime: """ Returns: @@ -198,14 +206,14 @@ def get_last_plant_time(self) -> datetime: """ return self._last_plant_time - def set_last_plant_time(self, time : datetime) -> None: + def set_last_plant_time(self, time: datetime) -> None: """ Args: time : datetime The last time that the user was provided plant feedback. """ self._last_plant_time = time - + def get_last_sniff_time(self) -> datetime: """ Returns: @@ -213,22 +221,24 @@ def get_last_sniff_time(self) -> datetime: """ return self._last_cushion_time - def set_last_sniff_time(self, time : datetime) -> None: + def set_last_sniff_time(self, time: datetime) -> None: """ Args: time : datetime The last time that the user was provided olfactory feedback. """ self._last_sniff_time = time - - def accept_new_posture_data(self, posture_data : List[float]) -> None: # TODO: Refine type signature + + def accept_new_posture_data( + self, posture_data: List[float] + ) -> None: # TODO: Refine type signature """ Update the internal store of posture data for the OLED display. Args: posture_data : List[float] New posture data to accept and merge with the current state of this object. - + TODO: Implement me! """ # DEBUG: @@ -237,10 +247,11 @@ def accept_new_posture_data(self, posture_data : List[float]) -> None: # TODO: R for datum in posture_data: self._posture_data.put_nowait(datum) - # SECTION: Posture data mapping - - def get_cushion_posture_data(self) -> "CUSHION_POSTURE_DATA": # TODO: Decide what this type looks like + + def get_cushion_posture_data( + self, + ) -> "CUSHION_POSTURE_DATA": # TODO: Decide what this type looks like """ Returns: (CUSHION_POSTURE_DATA): Posture data necessary for cushion feedback. @@ -251,7 +262,9 @@ def get_cushion_posture_data(self) -> "CUSHION_POSTURE_DATA": # TODO: Decide wha # :DEBUG return None - def get_plant_posture_data(self) -> "PLANT_POSTURE_DATA": # TODO: Decide what this type looks like + def get_plant_posture_data( + self, + ) -> "PLANT_POSTURE_DATA": # TODO: Decide what this type looks like """ Returns: (PLANT_POSTURE_DATA) Posture data necessary for plant feedback. @@ -261,8 +274,10 @@ def get_plant_posture_data(self) -> "PLANT_POSTURE_DATA": # TODO: Decide what th print(" WARNING: get_plant_posture_data() not implemented!") # :DEBUG return None - - def get_sniff_posture_data(self) -> "SNIFF_POSTURE_DATA": # TODO: Decide what this type looks like + + def get_sniff_posture_data( + self, + ) -> "SNIFF_POSTURE_DATA": # TODO: Decide what this type looks like """ Returns: (SNIFF_POSTURE_DATA): Posture data necessary for scent feedback. @@ -274,9 +289,9 @@ def get_sniff_posture_data(self) -> "SNIFF_POSTURE_DATA": # TODO: Decide what th return None - ## SECTION: Hardware packaged together + class HardwareComponents: """ Hardware components packaged together into a class. @@ -293,46 +308,50 @@ class HardwareComponents: Should get initialised ONCE THE USER IS LOGGED IN because the graph will look different for each user. """ - button0 : PiicoDev_Switch + button0: PiicoDev_Switch """A button with address switches set to [0, 0, 0, 0]""" - button1 : PiicoDev_Switch + button1: PiicoDev_Switch """A button with address switches set to [0, 0, 0, 1]""" - display : PiicoDev_SSD1306 + display: PiicoDev_SSD1306 """OLED SSD1306 Display with default address""" - posture_graph : PiicoDev_SSD1306.graph2D | None + posture_graph: PiicoDev_SSD1306.graph2D | None """ Graph object for rendering on self.display. NOT INITIALISED by default; i.e. None until initialised. Should get initialised ONCE THE USER IS LOGGED IN because the graph will look different for each user. """ - posture_graph_from : int | None + posture_graph_from: int | None """y-coordinate from which the posture graph begins, or `None` if no posture graph is active.""" # SECTION: Constructors - + @classmethod def make_fresh(cls): """ Create a new instance of HardwareComponents, set up according to the hardware that we expect to be plugged in. """ - DOUBLE_PRESS_DURATION = 400 # Milliseconds + DOUBLE_PRESS_DURATION = 400 # Milliseconds return HardwareComponents( - PiicoDev_Switch(id = [0, 0, 0, 0], double_press_duration = DOUBLE_PRESS_DURATION), # WARNING: 2024-09-01 17:12 Gabe: I think this produces an "I2C is not enabled" warning. No idea why. - PiicoDev_Switch(id = [0, 0, 0, 1], double_press_duration = DOUBLE_PRESS_DURATION), # WARNING: 2024-09-01 17:12 Gabe: I think this produces an "I2C is not enabled" warning. No idea why. - create_PiicoDev_SSD1306() # This is the constructor; ignore the "is not defined" error message. + PiicoDev_Switch( + id=[0, 0, 0, 0], double_press_duration=DOUBLE_PRESS_DURATION + ), # WARNING: 2024-09-01 17:12 Gabe: I think this produces an "I2C is not enabled" warning. No idea why. + PiicoDev_Switch( + id=[0, 0, 0, 1], double_press_duration=DOUBLE_PRESS_DURATION + ), # WARNING: 2024-09-01 17:12 Gabe: I think this produces an "I2C is not enabled" warning. No idea why. + create_PiicoDev_SSD1306(), # This is the constructor; ignore the "is not defined" error message. ) def __init__(self, button0, button1, display): - self.button0 : PiicoDev_Switch = button0 - self.button1 : PiicoDev_Switch = button1 - self.display : PiicoDev_SSD1306 = display - self.posture_graph : PiicoDev_SSD1306.graph2D | None = None - self.posture_graph_from : int | None = None - + self.button0: PiicoDev_Switch = button0 + self.button1: PiicoDev_Switch = button1 + self.display: PiicoDev_SSD1306 = display + self.posture_graph: PiicoDev_SSD1306.graph2D | None = None + self.posture_graph_from: int | None = None + # SECTION: Setters # 2024-09-02 14-45 Gabe: TESTED. - def get_control_messages(self, user_id : int) -> List[str]: + def get_control_messages(self, user_id: int) -> List[str]: """ Get messages to display during usual application loop. TODO: Finalise these! @@ -340,7 +359,7 @@ def get_control_messages(self, user_id : int) -> List[str]: Args: user_id : int ID of the currently logged-in user. - + Returns: (List[str]): The messages to display to the user during the main application loop. @@ -348,7 +367,7 @@ def get_control_messages(self, user_id : int) -> List[str]: return ["b0: logout", "id: " + str(user_id)] # 2024-09-13 08-31 Gabe: TESTED. - def initialise_posture_graph(self, user_id : int) -> None: + def initialise_posture_graph(self, user_id: int) -> None: """ Initialise self.posture_graph according to the provided user_id. @@ -358,31 +377,42 @@ def initialise_posture_graph(self, user_id : int) -> None: """ CONTROL_MESSAGES = self.get_control_messages(user_id) GRAPH_MIN_VALUE = 0 - GRAPH_MAX_VALUE = 60 # TODO: 2024-09-02 07-53 Gabe: - # This needs to be a real value for the underlying data that we expect to be shown. - # From memory, this is probably `60` for "number of the last 60 seconds spent sitting well" - LINE_HEIGHT = 15 # pixels - LINE_WIDTH = 16 # characters - + GRAPH_MAX_VALUE = 60 # TODO: 2024-09-02 07-53 Gabe: + # This needs to be a real value for the underlying data that we expect to be shown. + # From memory, this is probably `60` for "number of the last 60 seconds spent sitting well" + LINE_HEIGHT = 15 # pixels + LINE_WIDTH = 16 # characters + # The posture graph will occupy space from the bottom (y = HEIGHT - 1) up to initialisation_y_value. flatten = lambda xss: [x for xs in xss for x in xs] - it = flatten(map( - (lambda me: [me[i : i + LINE_WIDTH] for i in range(0, len(CONTROL_MESSAGES), LINE_WIDTH)]), - CONTROL_MESSAGES - )) + it = flatten( + map( + ( + lambda me: [ + me[i : i + LINE_WIDTH] + for i in range(0, len(CONTROL_MESSAGES), LINE_WIDTH) + ] + ), + CONTROL_MESSAGES, + ) + ) initialisation_y_value = len(it) * LINE_HEIGHT self.posture_graph_from = initialisation_y_value self.posture_graph = self.display.graph2D( - originX = 0, originY = HEIGHT - 1, - width = WIDTH, height = HEIGHT - initialisation_y_value, - minValue = GRAPH_MIN_VALUE, maxValue = GRAPH_MAX_VALUE, - c = 1, bars = False + originX=0, + originY=HEIGHT - 1, + width=WIDTH, + height=HEIGHT - initialisation_y_value, + minValue=GRAPH_MIN_VALUE, + maxValue=GRAPH_MAX_VALUE, + c=1, + bars=False, ) - + # SECTION: Using peripherals # 2024-09-01 16:57 Gabe: TESTED. - def oled_display_text(self, text : str, x : int, y : int, colour : int = 1) -> int: + def oled_display_text(self, text: str, x: int, y: int, colour: int = 1) -> int: """ Display text on the oled display, wrapping lines if necessary. NOTE: Does not blank display. Call `.display.fill(0)` if needed. @@ -399,19 +429,19 @@ def oled_display_text(self, text : str, x : int, y : int, colour : int = 1) -> i 0: black 1: white Defaults to 1: white. - + Returns: (int): The y-value at which any subsequent lines should start printing from. """ - LINE_HEIGHT = 15 # pixels - LINE_WIDTH = 16 # characters + LINE_HEIGHT = 15 # pixels + LINE_WIDTH = 16 # characters chunks = [text[i : i + LINE_WIDTH] for i in range(0, len(text), LINE_WIDTH)] - for (index, chunk) in enumerate(chunks): + for index, chunk in enumerate(chunks): self.display.text(chunk, x, y + index * LINE_HEIGHT, colour) return y + len(chunks) * LINE_HEIGHT - + # 2024-09-01 17:12 Gabe: TESTED. - def oled_display_texts(self, texts: List[str], x : int, y : int, colour : int) -> int: + def oled_display_texts(self, texts: List[str], x: int, y: int, colour: int) -> int: """ Display many lines of text on the oled display, wrapping lines if necessary. NOTE: Does not blank display. Call `.display.fill(0)` if needed. @@ -427,110 +457,46 @@ def oled_display_texts(self, texts: List[str], x : int, y : int, colour : int) - colour : int 0: black 1: white - + Returns: (int): The y-value at which any subsequent lines should start printing from. """ display_height_offset = 0 for text in texts: - display_height_offset = self.oled_display_text(text, x, y + display_height_offset, colour) + display_height_offset = self.oled_display_text( + text, x, y + display_height_offset, colour + ) return display_height_offset + def send_message(self, message: str, message_time: int = 1) -> None: + """Clear the screen and display message - -## SECTION: Picture - -class Picture: - """ - A picture, which may have failed. - - Member fields: - self.failed : bool - True iff this picture is incomplete. - self.underlying_picture : (TODO: Figure out this type!) - The picture encoded by this object. - """ - - def __init__(self) -> "Picture": - self.failed : bool = True - self.underlying_picture : "UNDERLYING_PICTURE" = None - - @classmethod - def make_failed(cls) -> "Picture": - """ - Make a failed Picture. - """ - return_me = Picture() - return_me.failed = True - return_me.underlying_picture = None - return return_me - - @classmethod - def make_valid(cls, underlying_picture : "UNDERLYING_PICTURE") -> "Picture": - """ - Make a valid Picture. - """ - return_me = Picture() - return_me.failed = False - return_me.underlying_picture = underlying_picture - return return_me - - - -## SECTION: Face - -class Face: - """ - A potentially recognised face, which may have failed to match or failed to run. - - Member fields: - self.failed : bool - True iff the facial recognition model failed - self.matched : bool - True iff the facial recognition model matched a face - self.user_id : int - if not self.failed and self.matched, then this string will contain the user - id of the matched user - """ - - def __init__(self) -> "Face": - self.failed : bool = True - self.matched : bool = False - self.user_id : int = None - - @classmethod - def make_failed(cls) -> "Face": - """ - Make a failed Face. - """ - return_me = Face() - return_me.failed = True - return_me.matched = False - return_me.user_id = None - return return_me - - @classmethod - def make_unmatched(cls) -> "Face": - """ - Make an unmatched Face. + Args: + message: Message to send to the user + message_time: Time (seconds) to sleep for after displaying message. """ - return_me = Face() - return_me.failed = False - return_me.matched = False - return_me.user_id = None - return return_me + self.display.fill(0) + self.oled_display_text(message, 0, 0, 1) + self.display.show() + time.sleep(message_time) - @classmethod - def make_matched(cls, user_id : int) -> "Face": - """ - Make a matched face. + def wait_for_button_press(self) -> int: + """Waits for a button to be pressed and then returns the button number. - Args: - user_id : int - The matched user id - """ - return_me = Face() - return_me.failed = False - return_me.matched = True - return_me.user_id = user_id - return return_me + Returns: + The number of the button pressed. + """ + self._clear_buttons() + while True: + if self.button0.was_pressed: + return LEFT_BUTTON + + if self.button1.was_pressed: + return RIGHT_BUTTON + + def _clear_buttons(self) -> None: + """Clear pressed status from all buttons.""" + self.button0.was_pressed + self.button1.was_pressed + self.button0.was_double_pressed + self.button1.was_double_pressed diff --git a/client/drivers/login_system.py b/client/drivers/login_system.py index 386e3ca..76eb1fd 100644 --- a/client/drivers/login_system.py +++ b/client/drivers/login_system.py @@ -2,14 +2,132 @@ Login system """ +import logging +from typing import Callable -def handle_authentication() -> int: - pass +import numpy as np +from data.routines import next_user_id, create_user +from models.pose_detection.frame_capturer import RaspCapturer +from models.face_recognition.recognition import register_faces, get_face_match, Status +from drivers.data_structures import HardwareComponents, LEFT_BUTTON, RIGHT_BUTTON -def handle_register() -> int: - pass +NUM_FACES = 5 +QUIT = -4 +BAD_STATUS_MESSAGES = { + Status.NO_FACES.value: "No face detected please", + Status.TOO_MANY_FACES.value: "Too many faces detected", + Status.NO_MATCH.value: "Could not match face", +} +QUIT_INSTRUCTIONS = "Right button to quit" +Action = Callable[[HardwareComponents], int] -def handle_login() -> int: - pass +logger = logging.getLogger(__name__) + + +def handle_authentication(hardware: HardwareComponents) -> int: + """Run authentication loop until user either registers or logs in. + + Args: + hardware: connected RPI hardware + + Returns: + id of logged in user. + """ + while True: + _log_and_send(hardware, "Left button to login\nRight button to register") + button = hardware.wait_for_button_press() + + if button == RIGHT_BUTTON: + status = _loop_action(hardware, _attempt_register) + + if button == LEFT_BUTTON: + status = _loop_action(hardware, _attempt_login) + + if _is_status_id(status): + return status + + if status != QUIT: + error = ValueError(f"Did not expect status: {status}") + hardware.send_message(str(error)) + raise error + + +def _loop_action(hardware: HardwareComponents, action: Action) -> int: + """Loop action until appropriate status is returned""" + while True: + status = action(hardware) + + if status == QUIT: + return QUIT + + if _is_status_id(status): + return status + + +def _attempt_login(hardware: HardwareComponents) -> int: + capturer = RaspCapturer() + message = f"Press left button to take photo\n{QUIT_INSTRUCTIONS}" + _log_and_send(hardware, message) + + button_pressed = hardware.wait_for_button_press() + if button_pressed == LEFT_BUTTON: + face, _ = capturer.get_frame() + + if button_pressed == RIGHT_BUTTON: + return QUIT + + status = get_face_match(face) + _handle_status_message(hardware, status) + + return status + + +def _attempt_register(hardware: HardwareComponents) -> int: + capturer = RaspCapturer() + + # Capture NUM_FACES faces + faces: list[np.ndarray] = [] + for i in range(NUM_FACES): + message = ( + f"Press left button to take photo {i + 1}/{NUM_FACES}\n" + f"{QUIT_INSTRUCTIONS}" + ) + _log_and_send(hardware, message) + + button_pressed = hardware.wait_for_button_press() + if button_pressed == RIGHT_BUTTON: + return QUIT + + if button_pressed == LEFT_BUTTON: + frame, _ = capturer.get_frame() + faces.append(frame) + + # Try register faces + _log_and_send(hardware, "Registering...") + user_id = next_user_id() + status = register_faces(user_id, faces) + + if status == Status.OK.value: + create_user() + _log_and_send(hardware, "Registration successful!") + return user_id + + _handle_status_message(hardware, status) + + return status + + +def _is_status_id(status: int) -> bool: + return status > 0 + + +def _handle_status_message(hardware: HardwareComponents, status: int) -> None: + if status in BAD_STATUS_MESSAGES: + _log_and_send(hardware, BAD_STATUS_MESSAGES[status]) + + +def _log_and_send(hardware: HardwareComponents, message: str) -> None: + logger.debug(message) + hardware.send_message(message) diff --git a/client/drivers/main.py b/client/drivers/main.py index 43112fe..1118898 100644 --- a/client/drivers/main.py +++ b/client/drivers/main.py @@ -4,11 +4,11 @@ File: sitting-desktop-garden/client/drivers/main.py Author: - Gabriel Field (47484306) + Gabriel Field (47484306), Mitchell Clark """ ## SECTION: Imports - +import logging from PiicoDev_Unified import sleep_ms from PiicoDev_Switch import * @@ -16,13 +16,12 @@ import RPi.GPIO as GPIO -from typing import Tuple -import threading from datetime import datetime, timedelta -from data_structures import ControlledData, HardwareComponents, Picture, Face +from drivers.data_structures import ControlledData, HardwareComponents from ai_bros import * from data.routines import * +from drivers.login_system import handle_authentication ## SECTION: Global constants @@ -70,6 +69,7 @@ DEBUG_DO_EVERYTHING_INTERVAL = 1000 """ DEBUG Number of milliseconds between each loop iteration in do_everything(). """ +logger = logging.getLogger(__name__) ## SECTION: main() @@ -78,23 +78,16 @@ def main(): """ Entry point for the control program. """ - # DEBUG: - print(" main()") - # :DEBUG - - global hardware - hardware = initialise_hardware() + logging.basicConfig(level=logging.DEBUG) + logger.debug("Running main") init_database() - # Top level control flow while True: - wait_for_login_attempt() - main_data = attempt_login() - if main_data.is_failed(): - continue - print(" main(): Successful login") - do_everything(main_data) + user_id = handle_authentication(hardware) + user = ControlledData.make_empty(user_id) + logger.debug("Login successful") + do_everything(user) ## SECTION: Hardware initialisation @@ -129,176 +122,6 @@ def initialise_hardware() -> HardwareComponents: return return_me -## SECTION: Login handling - - -# 2024-09-01_15-52 Gabe: TESTED. -def wait_for_login_attempt() -> bool: - """ - Waits until the user attempts to log in. - - Returns: - (bool): True when the user attempts to log in. - """ - print(" BEGIN wait_for_login_attempt()") - - WAIT_FOR_LOGIN_OLED_MESSAGE = "Press button0 to log in!" - # Display to screen - hardware.display.fill(0) - hardware.oled_display_text(WAIT_FOR_LOGIN_OLED_MESSAGE, 0, 0, 1) - hardware.display.show() - # Clear button queue - hardware.button0.was_pressed - - while True: - if hardware.button0.was_pressed: - # Clear the display - hardware.display.fill(0) - hardware.display.show() - print(" END wait_for_login_attempt()") # DEBUG - return True - sleep_ms(WAIT_FOR_LOGIN_POLLING_INTERVAL) - - -# 2024-09-01 17:06 Gabe: TESTED., assuming ai_bros_face_recogniser() does what it should do. -def attempt_login() -> ControlledData: - """ - Attempts to log in. - - Returns: - (ControlledData): which is: - FAILED if the login is unsuccessful - EMPTY (but not failed) if the login is successful - """ - - # TODO: Finalise these messages - SMILE_FOR_CAMERA_MESSAGE = "LIS: Smile for the camera!" - PICTURE_FAILED_MESSAGE = "LIS: Picture failed T-T" - AI_FAILED_MESSAGE = "LIS: Failed to determine user T-T" - LOGIN_TOTALLY_FAILED_MESSAGE = "LIS: Failed to log in T-T" - - hardware.display.fill(0) - hardware.oled_display_text(SMILE_FOR_CAMERA_MESSAGE, 0, 0, 1) - hardware.display.show() - sleep_ms(START_LOGIN_ATTEMPTS_DELAY) - - while True: - hardware.display.fill(0) - hardware.oled_display_text(SMILE_FOR_CAMERA_MESSAGE, 0, 0, 1) - hardware.display.show() - picture = take_picture() - if picture.failed: - print(" Picture Failed") # DEBUG - hardware.display.fill(0) - hardware.oled_display_text(PICTURE_FAILED_MESSAGE, 0, 0, 1) - hardware.display.show() - sleep_ms(LOGIN_TAKE_PICTURE_INTERVAL) - continue - face = ai_bros_face_recogniser( - picture.underlying_picture - ) # TODO: This should be an external API call. - if face.failed: - print(" AI has failed us") # DEBUG - hardware.display.fill(0) - hardware.oled_display_text(AI_FAILED_MESSAGE, 0, 0, 1) - hardware.display.show() - sleep_ms(LOGIN_TAKE_PICTURE_INTERVAL) - continue - if face.matched: - print(" Mega W for AI") # DEBUG - return ControlledData.make_empty(face.user_id) - elif ask_create_new_user(): - return ControlledData.make_empty( - create_new_user(picture.underlying_picture) - ) - # Tell the user the login failed - print(" attempt_login(): Totally failed lol") # DEBUG - hardware.display.fill(0) - hardware.oled_display_text(LOGIN_TOTALLY_FAILED_MESSAGE, 0, 0, 1) - hardware.display.show() - sleep_ms(FAIL_LOGIN_DELAY) - return ControlledData.make_failed() - - -def take_picture() -> Picture: - """ - Takes a picture from the camera, and returns a (failable) picture object. - - TODO: Actually write this function. Currently prints a debug message and returns a failed picture. - """ - # DEBUG: - print(" take_picture()") - DEBUG_return_value = Picture.make_valid("DEBUG_picture_goes_here") - # :DEBUG - return DEBUG_return_value - - -# 2024-09-01 17:06 Gabe: TESTED. -def ask_create_new_user() -> bool: - """ - Ask the user whether they would like to create a new user profile based on the previous picture. - - Returns: - (bool): True iff the user would like to create a new user profile - TODO: Make this go out to hardware peripherals. It should have: - Two buttons (yes / no) - The LED display ("Unmatched face. Create new user?") - """ - print(" BEGIN ask_create_new_user()") - - CREATE_NEW_USER_MESSAGES = [ - "No face matched.", - "Create new user?", - "button0: no", - "button1: yes", - ] - # Display to screen - hardware.display.fill(0) - hardware.oled_display_texts(CREATE_NEW_USER_MESSAGES, 0, 0, 1) - hardware.display.show() - # Clear button queue - hardware.button0.was_pressed - hardware.button1.was_pressed - - while True: - if hardware.button0.was_pressed: - # Clear the display - hardware.display.fill(0) - hardware.display.show() - print(" END ask_create_new_user(): do NOT create new user") # DEBUG - return False - if hardware.button1.was_pressed: - # Clear the display - hardware.display.fill(0) - hardware.display.show() - print(" END ask_create_new_user(): DO create new user") # DEBUG - return True - sleep_ms(ASK_CREATE_NEW_USER_POLLING_INTERVAL) - - -def create_new_user(underlying_picture: int) -> int: - """ - Create a new user based on the given picture, and return their user id. - - Args: - underlying_picture : UNDERLYING_PICTURE - Picture to associate with the new user profile - Returns: - (int): The new user's user id - TODO: Actually write to the local SQLite database, and properly determine the new user id. - """ - # DEBUG: - DEBUG_new_user_id = 0 - # new_user_id = create_user() - # try: - # register_faces(new_user_id, [underlying_picture]) - # except NotImplementedError: - # pass - # :DEBUG - return DEBUG_new_user_id - # return new_user_id # DEBUG - - ## SECTION: Control for the logged-in user @@ -577,4 +400,5 @@ def handle_sniff_feedback(auspost: ControlledData) -> bool: ## LAUNCH if __name__ == "__main__": + hardware = initialise_hardware() main()