Skip to content
This repository has been archived by the owner on Dec 8, 2024. It is now read-only.

Obliteration #90

Merged
merged 5 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 6 additions & 39 deletions client/drivers/data_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,6 @@ def make_failed(cls) -> "ControlledData":

# SECTION: Getters/Setters

def DEBUG_get_next_posture_graph_value(self) -> int:
"""
Returns next thing to put on the DEBUG graph.

TODO: Remove this method
"""
return_me = self._DEBUG_current_graph_function(
self._DEBUG_current_graph_list_index
)
self._DEBUG_current_graph_list_index += 1
return return_me

def is_failed(self) -> bool:
"""
Returns True iff this ControlledData is failed.
Expand Down Expand Up @@ -183,31 +171,6 @@ def accept_new_posture_data(
for datum in posture_data:
self._posture_data.put_nowait(datum)

# SECTION: Posture data mapping

def get_cushion_posture_data(
self,
) -> "CUSHION_POSTURE_DATA": # TODO: Decide what this type looks like
"""
Returns posture data necessary for cushion feedback.

TODO: Implement this.
"""
print("<!> WARNING: get_cushion_posture_data() not implemented!")
return None

def get_plant_posture_data(
self,
) -> "PLANT_POSTURE_DATA": # TODO: Decide what this type looks like
"""
Returns posture data necessary for plant feedback.

TODO: Implement this.
"""
print("<!> WARNING: get_plant_posture_data() not implemented!")
return None


class HardwareComponents:
"""
Hardware components packaged together into a class.
Expand Down Expand Up @@ -467,15 +430,19 @@ def oled_display_texts(self, texts: List[str], x: int, y: int, colour: int) -> i
)
return display_height_offset

def send_message(self, message: str, message_time: int = 1) -> None:
def send_message(self, messages: List[str], message_time: int = 1) -> None:
"""Clear the screen and display message

Args:
message: Message to send to the user
message_time: Time (seconds) to sleep for after displaying message.
"""
self.display.fill(0)
self.oled_display_text(message, 0, 0, 1)
display_height_offset = 0
for text in messages:
display_height_offset = self.oled_display_text(
text, 0, 0 + display_height_offset, 1
)
self.display.show()
time.sleep(message_time)

Expand Down
32 changes: 16 additions & 16 deletions client/drivers/login_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
Status.NO_MATCH.value: "Could not match face",
Status.ALREADY_REGISTERED.value: "Face already registered",
}
QUIT_INSTRUCTIONS = "Right button to quit"
QUIT_INSTRUCTIONS = "Right: quit"

Action = Callable[[HardwareComponents], int]

Expand All @@ -44,9 +44,9 @@ def handle_authentication(hardware: HardwareComponents) -> int:
while True:
_log_and_send(
hardware,
"Left button to login\n"
"Right button to register\n"
"Double press right button to reset data",
["Left: login",
"Right: register",
"Double press right: reset data"]
)
button = hardware.wait_for_button_press()

Expand Down Expand Up @@ -82,8 +82,8 @@ def _loop_action(hardware: HardwareComponents, action: Action) -> int:

def _attempt_login(hardware: HardwareComponents) -> int:
capturer = RaspCapturer()
message = f"Press left button to take photo\n{QUIT_INSTRUCTIONS}"
_log_and_send(hardware, message, message_time=0)
messages = ["Left: take photo", f"{QUIT_INSTRUCTIONS}"]
_log_and_send(hardware, messages, message_time=0)

button_pressed = hardware.wait_for_button_press()
if button_pressed == LEFT_BUTTON:
Expand All @@ -92,7 +92,7 @@ def _attempt_login(hardware: HardwareComponents) -> int:
if button_pressed == RIGHT_BUTTON:
return QUIT

_log_and_send(hardware, "Trying login...", message_time=0)
_log_and_send(hardware, ["Trying login..."], message_time=0)
status = get_face_match(face)
_handle_status_message(hardware, status)

Expand All @@ -105,11 +105,11 @@ def _attempt_register(hardware: HardwareComponents) -> int:
# Capture NUM_FACES faces
faces: list[np.ndarray] = []
for i in range(NUM_FACES):
message = (
f"Press left button to take photo {i + 1}/{NUM_FACES}\n"
messages = [
f"Left: take photo {i + 1}/{NUM_FACES}",
f"{QUIT_INSTRUCTIONS}"
)
_log_and_send(hardware, message, message_time=0)
]
_log_and_send(hardware, messages, message_time=0)

button_pressed = hardware.wait_for_button_press()
if button_pressed == RIGHT_BUTTON:
Expand All @@ -120,13 +120,13 @@ def _attempt_register(hardware: HardwareComponents) -> int:
faces.append(frame)

# Try register faces
_log_and_send(hardware, "Registering...")
_log_and_send(hardware, ["Registering..."])
user_id = next_user_id()
status = register_faces(user_id, faces)

if status == Status.OK.value:
create_user()
_log_and_send(hardware, "Registration successful!")
_log_and_send(hardware, ["Registration successful!"])
return user_id

_handle_status_message(hardware, status)
Expand All @@ -144,7 +144,7 @@ def _handle_status_message(hardware: HardwareComponents, status: int) -> None:


def _log_and_send(
hardware: HardwareComponents, message: str, message_time: int = 1
hardware: HardwareComponents, messages: list[str], message_time: int = 1
) -> None:
logger.debug(message)
hardware.send_message(message, message_time=message_time)
logger.debug(messages)
hardware.send_message(messages, message_time=message_time)
72 changes: 25 additions & 47 deletions client/drivers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@
#: the plant will move down.
#: FIXME: Fine-tune this value later.
PLANT_PROPORTION_GOOD_THRESHOLD = 0.5
"""
Threshold for I. Jensen Plant Mover 10000 feedback. If the proportion of "good" sitting posture is below this,
the plant will move down.
FIXME: Fine-tune this value later.
"""

#: DEBUG Number of milliseconds between each loop iteration in do_everything().
DEBUG_DO_EVERYTHING_INTERVAL = 1000
Expand Down Expand Up @@ -169,8 +174,6 @@ def do_everything(auspost: ControlledData) -> None:
Requires:
! auspost.is_failed()
"""
print("<!> BEGIN do_everything()")

LOGIN_MESSAGE = "Logged in with user id: " + str(auspost.get_user_id())
LOGOUT_MESSAGE = "Logged out user id " + str(auspost.get_user_id())

Expand All @@ -197,7 +200,6 @@ def do_everything(auspost: ControlledData) -> None:
hardware.display.show()

while True:
# Loop invariant: ! auspost.is_failed()
# Check for user logout
if hardware.button0.was_pressed:
hardware.display.fill(0)
Expand All @@ -208,8 +210,7 @@ def do_everything(auspost: ControlledData) -> None:
return

update_display_screen(auspost)
# handle_posture_monitoring(auspost)
handle_posture_monitoring_new(auspost)
handle_posture_graph(auspost)
handle_feedback(auspost)

sleep_ms(DEBUG_DO_EVERYTHING_INTERVAL)
Expand All @@ -235,8 +236,6 @@ def update_display_screen(auspost: ControlledData) -> bool:
Ensures:
! auspost.is_failed()
"""
print("<!> BEGIN update_display_screen()")

while (
not auspost.get_posture_data().empty()
): # NOTE: This is much more robust than getting a fixed number of things out of the queue
Expand All @@ -249,14 +248,25 @@ def update_display_screen(auspost: ControlledData) -> bool:
)
hardware.display.show()

print("<!> END update_display_screen()")
return True


def handle_posture_monitoring_new(auspost: ControlledData) -> bool:
def handle_posture_graph(auspost: ControlledData) -> bool:
"""
Get a snapshot of the user's posture data.
Use this information to update the data for the posture graph.

print("<!> handle_posture_monitoring_new()")
Args:
(auspost : ControlledData): Data encapsulating the current state of the program.
Returns:
(bool): True, always. If you get a False return value, then something has gone VERY wrong.
Requires:
! auspost.is_failed()
Ensures:
! auspost.is_failed()

TODO: Check this
"""
now = datetime.now()

if now > auspost.get_last_snapshot_time() + GET_POSTURE_DATA_TIMEOUT:
Expand All @@ -268,14 +278,12 @@ def handle_posture_monitoring_new(auspost: ControlledData) -> bool:
period_end=now,
)

# Exit if not enough data
# if len(recent_posture_data) <= POSTURE_GRAPH_DATUM_WIDTH:
# Exit if no data
if len(recent_posture_data) == 0:
print("<!> Exiting handle_posture_monitoring_new() early: Not enough data")
# auspost.set_last_snapshot_time(datetime.now())
return True

# Exit if not in frame enough
# Exit if person not in frame enough
average_prop_in_frame = sum(
[posture.prop_in_frame for posture in recent_posture_data]
) / len(recent_posture_data)
Expand All @@ -301,11 +309,12 @@ def handle_posture_monitoring_new(auspost: ControlledData) -> bool:
# Calculate the interval length
interval = total_time / POSTURE_GRAPH_DATUM_WIDTH

# Setup a sublist each representing 1 pixel on the graph
# Setup sublists, where each sublist is a portion of the overall data
split_posture_lists: list[list[Posture]]
split_posture_lists = [[] for _ in range(POSTURE_GRAPH_DATUM_WIDTH)]

# Sublists will be split by period_start
# What is in each sublist is determined by period_start
# We want an approximately equal amount of data in each sublist
for posture in recent_posture_data:
index = min(
POSTURE_GRAPH_DATUM_WIDTH - 1,
Expand All @@ -322,44 +331,13 @@ def handle_posture_monitoring_new(auspost: ControlledData) -> bool:
average_prop_good = sum(
[posture.prop_good for posture in posture_list]
) / len(posture_list)
# KILLME:
# print(f"<!> {average_prop_good=}")
# auspost.accept_new_posture_data([average_prop_good] * DEBUG_MULTIPLIER_CONSTANT) # DEBUG: 2024-10-06_20-16 Gabe: Fixed the typing by wrapping into a singleton list
# print(f"<!> Avg prop_good is {average_prop_good}")
new_prop_good_data += [average_prop_good] * POSTURE_GRAPH_DATUM_WIDTH
auspost.accept_new_posture_data(new_prop_good_data)

auspost.set_last_snapshot_time(now)

return True


# SECTION: Feedback handling
def handle_posture_monitoring(auspost: ControlledData) -> bool:
"""
Take a snapshot monitoring the user, and update the given ControlledData if necessary.

Args:
auspost: Data encapsulating the current state of the program.

Returns:
(bool): True, always. If you get a False return value, then something has gone VERY wrong.

Requires:
! auspost.is_failed()

Ensures:
! auspost.is_failed()
"""
print("<!> handle_posture_monitoring()")
now = datetime.now()
if now > auspost.get_last_snapshot_time() + GET_POSTURE_DATA_TIMEOUT:
auspost.accept_new_posture_data([])
auspost.accept_new_posture_data([auspost.DEBUG_get_next_posture_graph_value()])
auspost.set_last_snapshot_time(now)
return True


def handle_feedback(auspost: ControlledData) -> bool:
"""
Provide feedback to the user if necessary.
Expand Down